Commit e08cc205 authored by zhoushiguang's avatar zhoushiguang

flink-sql-connectors-kafka 源码修改部分

parent ba9ce0e2
修改flink-sql-connector-kafka_2.11-1.14.3源码类:
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource
GROUP_OFFSET修改为:
case GROUP_OFFSETS:
//kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
String offsetResetConfig =
properties.getProperty(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.NONE.name());
OffsetResetStrategy offsetResetStrategy = getResetStrategy(offsetResetConfig);
kafkaSourceBuilder.setStartingOffsets(
OffsetsInitializer.committedOffsets(offsetResetStrategy));
break;
private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
return Arrays.stream(OffsetResetStrategy.values())
.filter(ors -> ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
.findAny()
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"%s can not be set to %s. Valid values: [%s]",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
offsetResetConfig,
Arrays.stream(OffsetResetStrategy.values())
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.joining(",")))));
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- <parent>-->
<!-- <artifactId>wj-datacenter-platform</artifactId>-->
<!-- <groupId>net.wanji</groupId>-->
<!-- <version>1.0-SNAPSHOT</version>-->
<!-- </parent>-->
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>1.1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.3</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<jackson.version>2.13.2</jackson.version>
<elasticsearch.version>7.10.1</elasticsearch.version>
<es.client.version>7.10.1</es.client.version>
<!-- jar包名尾部标识 -->
<!--<jar.tail>all</jar.tail>-->
<!--<jar.tail>batch-track</jar.tail>-->
<!--<jar.tail>export</jar.tail>-->
<!--<jar.tail>export</jar.tail>-->
<!--<jar.tail>monitor</jar.tail>-->
<!--<jar.tail>ped</jar.tail>-->
<!--<jar.tail>event</jar.tail>-->
<jar.tail>event</jar.tail>
<jar.tail>all</jar.tail>
<!--<jar.tail>area</jar.tail>-->
<!--<jar.tail>route</jar.tail>-->
<!--<jar.tail>route</jar.tail>
<jar.tail>event</jar.tail>
<jar.tail>route</jar.tail>-->
<!--<jar.tail>ped</jar.tail>-->
<!--<jar.tail>period</jar.tail>-->
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.10.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>
<dependency>
<groupId>com.vividsolutions</groupId>
<artifactId>jts</artifactId>
<version>1.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.19.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.orbitz.consul/consul-client -->
<dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
<version>1.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ecwid.consul/consul-api -->
<!--<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
<version>1.4.5</version>
</dependency>-->
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>-->
<!-- flink java 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink java 实时计算依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink kafka 官方依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.getindata/flink-http-connector -->
<!--<dependency>
<groupId>com.getindata</groupId>
<artifactId>flink-http-connector</artifactId>
<version>0.9.0</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- alibaba fastjson 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
<!-- redis链接依赖包 -->
<!--<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.1.1</version>
</dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>1.7.36</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.23</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.yaml/snakeyaml -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
</dependency>
<!--<dependency>
<groupId>com.github.ulisesbocchio</groupId>
<artifactId>jasypt-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
<!-- 解决consul包中 jackson-databind 2.12.0 依赖漏洞的问题-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- MyBatisPlus -->
<!--<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.2</version>
</dependency>-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>3.0.7.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-extension</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-typehandlers-jsr310</artifactId>
<version>1.0.2</version>
</dependency>
<!--<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.7</version>
</dependency>-->
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
<version>2.2.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.2.13.RELEASE</version>
</dependency>-->
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<!--阿里 Druid Spring Boot Starter依赖 -->
<!--<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.2</version>
</dependency>
<!--MySQL JDBC驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>net.tascalate</groupId>
<artifactId>net.tascalate.concurrent</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>net.wanji</groupId>
<artifactId>wj-common</artifactId>
<version>0.0.2</version>
<exclusions>
<exclusion>
<artifactId>commons-lang</artifactId>
<groupId>commons-lang</groupId>
</exclusion>
<exclusion>
<artifactId>javassist</artifactId>
<groupId>org.javassist</groupId>
</exclusion>
<exclusion>
<artifactId>jsr305</artifactId>
<groupId>com.google.code.findbugs</groupId>
</exclusion>
<exclusion>
<artifactId>lz4-java</artifactId>
<groupId>org.lz4</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<!--<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.2</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
&lt;!&ndash;<skipTests>true</skipTests>&ndash;&gt;
<archive>
<manifest>
&lt;!&ndash;<mainClass>com.wanji.indicators.task.track.stream.TrackAndRouteMain</mainClass>&ndash;&gt;
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
<manifestEntries>
<Class-Path>./</Class-Path>
</manifestEntries>
</archive>
<excludes>
<exclude>bin/**</exclude>
<exclude>redis/**</exclude>
<exclude>logger/**</exclude>
&lt;!&ndash;<exclude>**/*.properties</exclude>&ndash;&gt;
<exclude>**/*.yml</exclude>
<exclude>**/*.txt</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
&lt;!&ndash;<skipTests>true</skipTests>&ndash;&gt;
&lt;!&ndash; not append assembly id in release file name &ndash;&gt;
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration combine.self="override">
<createDependencyReducedPom>false</createDependencyReducedPom>
<shadedArtifactAttached>false</shadedArtifactAttached>
<finalName>${project.artifactId}-${jar.tail}-${project.version}</finalName>
<filters>
<!-- Globally exclude log4j.properties from our JAR files. -->
<filter>
<artifact>*</artifact>
<excludes>
<!--<exclude>log4j.properties</exclude>-->
<exclude>log4j-test.properties</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<exclude>log4j:log4j</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.wanji.indicators.task.track.stream.TrackUnionMain</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
<execution>
<!--
Disable inherited shade-flink to prevent the Shade plugin from changing the project.basedir. The basedir
is changed by the Shade plugin when dependencyReducedPomLocation is set to a different location than the
original basedir. We do that in the root pom.xml.
-->
<id>shade-flink</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package org.apache.flink.connector.kafka.source.enumerator.initializer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
@PublicEvolving
public interface OffsetsInitializer extends Serializable {
Map<TopicPartition, Long> getPartitionOffsets(Collection<TopicPartition> paramCollection, PartitionOffsetsRetriever paramPartitionOffsetsRetriever);
OffsetResetStrategy getAutoOffsetResetStrategy();
static OffsetsInitializer committedOffsets() {
return committedOffsets(OffsetResetStrategy.NONE);
}
static OffsetsInitializer committedOffsets(OffsetResetStrategy offsetResetStrategy) {
return new ReaderHandledOffsetsInitializer(-3L, offsetResetStrategy);
}
static OffsetsInitializer timestamp(long timestamp) {
return new TimestampOffsetsInitializer(timestamp);
}
static OffsetsInitializer earliest() {
return new ReaderHandledOffsetsInitializer(-2L, OffsetResetStrategy.EARLIEST);
}
static OffsetsInitializer latest() {
return new ReaderHandledOffsetsInitializer(-1L, OffsetResetStrategy.LATEST);
}
static OffsetsInitializer offsets(Map<TopicPartition, Long> offsets) {
return new SpecifiedOffsetsInitializer(offsets, OffsetResetStrategy.EARLIEST);
}
static OffsetsInitializer offsets(Map<TopicPartition, Long> offsets, OffsetResetStrategy offsetResetStrategy) {
return new SpecifiedOffsetsInitializer(offsets, offsetResetStrategy);
}
public static interface PartitionOffsetsRetriever {
Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> param1Collection);
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> param1Collection);
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> param1Collection);
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> param1Map);
}
}
package org.apache.flink.connector.kafka.source.enumerator.initializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
class ReaderHandledOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
private static final long serialVersionUID = 172938052008787981L;
private final long startingOffset;
private final OffsetResetStrategy offsetResetStrategy;
ReaderHandledOffsetsInitializer(long startingOffset, OffsetResetStrategy offsetResetStrategy) {
this.startingOffset = startingOffset;
this.offsetResetStrategy = offsetResetStrategy;
}
public Map<TopicPartition, Long> getPartitionOffsets(Collection<TopicPartition> partitions, PartitionOffsetsRetriever partitionOffsetsRetriever) {
Map<TopicPartition, Long> initialOffsets = new HashMap<>();
for (TopicPartition tp : partitions)
initialOffsets.put(tp, Long.valueOf(this.startingOffset));
return initialOffsets;
}
public OffsetResetStrategy getAutoOffsetResetStrategy() {
return this.offsetResetStrategy;
}
public void validate(Properties kafkaSourceProperties) {
if (this.startingOffset == -3L)
Preconditions.checkState(kafkaSourceProperties
.containsKey("group.id"),
String.format("Property %s is required when using committed offset for offsets initializer", new Object[] { "group.id" }));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.source.enumerator.initializer;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import java.util.*;
import static org.apache.flink.util.Preconditions.checkState;
/**
* An implementation of {@link OffsetsInitializer} which initializes the offsets of the partition
* according to the user specified offsets.
*
* <p>Package private and should be instantiated via {@link OffsetsInitializer}.
*/
class SpecifiedOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
private static final long serialVersionUID = 1649702397250402877L;
private final Map<TopicPartition, Long> initialOffsets;
private final OffsetResetStrategy offsetResetStrategy;
SpecifiedOffsetsInitializer(
Map<TopicPartition, Long> initialOffsets, OffsetResetStrategy offsetResetStrategy) {
this.initialOffsets = Collections.unmodifiableMap(initialOffsets);
this.offsetResetStrategy = offsetResetStrategy;
}
@Override
public Map<TopicPartition, Long> getPartitionOffsets(
Collection<TopicPartition> partitions,
PartitionOffsetsRetriever partitionOffsetsRetriever) {
Map<TopicPartition, Long> offsets = new HashMap<>();
List<TopicPartition> toLookup = new ArrayList<>();
for (TopicPartition tp : partitions) {
Long offset = initialOffsets.get(tp);
if (offset == null) {
toLookup.add(tp);
} else {
offsets.put(tp, offset);
}
}
if (!toLookup.isEmpty()) {
// First check the committed offsets.
Map<TopicPartition, Long> committedOffsets =
partitionOffsetsRetriever.committedOffsets(toLookup);
offsets.putAll(committedOffsets);
toLookup.removeAll(committedOffsets.keySet());
switch (offsetResetStrategy) {
case EARLIEST:
offsets.putAll(partitionOffsetsRetriever.beginningOffsets(toLookup));
break;
case LATEST:
offsets.putAll(partitionOffsetsRetriever.endOffsets(toLookup));
break;
default:
throw new IllegalStateException(
"Cannot find initial offsets for partitions: " + toLookup);
}
}
return offsets;
}
@Override
public OffsetResetStrategy getAutoOffsetResetStrategy() {
return offsetResetStrategy;
}
@Override
public void validate(Properties kafkaSourceProperties) {
initialOffsets.forEach(
(tp, offset) -> {
if (offset == KafkaPartitionSplit.COMMITTED_OFFSET) {
checkState(
kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
String.format(
"Property %s is required because partition %s is initialized with committed offset",
ConsumerConfig.GROUP_ID_CONFIG, tp));
}
});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.util.Collector;
import java.io.Serializable;
/**
* The deserialization schema describes how to turn the Kafka ConsumerRecords into data types
* (Java/Scala objects) that are processed by Flink.
*
* @param <T> The type created by the keyed deserialization schema.
*/
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Initialization method for the schema. It is called before the actual working methods {@link
* #deserialize} and thus suitable for one time setup work.
*
* <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access
* additional features such as e.g. registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
default void open(DeserializationSchema.InitializationContext context) throws Exception {}
/**
* Method to decide whether the element signals the end of the stream. If true is returned the
* element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
/**
* Deserializes the Kafka record.
*
* @param record Kafka record to be deserialized.
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
/**
* Deserializes the Kafka record.
*
* <p>Can output multiple records through the {@link Collector}. Note that number and size of
* the produced records should be relatively small. Depending on the source implementation
* records can be buffered in memory or collecting records might delay emitting checkpoint
* barrier.
*
* @param message The message, as a byte array.
* @param out The collector to put the resulting messages.
*/
default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
throws Exception {
T deserialized = deserialize(message);
if (deserialized != null) {
out.collect(deserialized);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.io.Serializable;
/**
* A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link
* ProducerRecord ProducerRecords}.
*
* <p>Please also implement {@link KafkaContextAware} if your serialization schema needs information
* about the available partitions and the number of parallel subtasks along with the subtask ID on
* which the Kafka Producer is running.
*
* @param <T> the type of values being serialized
*/
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {
/**
* Initialization method for the schema. It is called before the actual working methods {@link
* #serialize(Object, Long)} and thus suitable for one time setup work.
*
* <p>The provided {@link SerializationSchema.InitializationContext} can be used to access
* additional features such as e.g. registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
default void open(SerializationSchema.InitializationContext context) throws Exception {}
/**
* Serializes given element and returns it as a {@link ProducerRecord}.
*
* @param element element to be serialized
* @param timestamp timestamp (can be null)
* @return Kafka {@link ProducerRecord}
*/
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/** A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSource}. */
class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
private final @Nullable DeserializationSchema<RowData> keyDeserialization;
private final DeserializationSchema<RowData> valueDeserialization;
private final boolean hasMetadata;
private final BufferingCollector keyCollector;
private final OutputProjectionCollector outputCollector;
private final TypeInformation<RowData> producedTypeInfo;
private final boolean upsertMode;
DynamicKafkaDeserializationSchema(
int physicalArity,
@Nullable DeserializationSchema<RowData> keyDeserialization,
int[] keyProjection,
DeserializationSchema<RowData> valueDeserialization,
int[] valueProjection,
boolean hasMetadata,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
"Key must be set in upsert mode for deserialization schema.");
}
this.keyDeserialization = keyDeserialization;
this.valueDeserialization = valueDeserialization;
this.hasMetadata = hasMetadata;
this.keyCollector = new BufferingCollector();
this.outputCollector =
new OutputProjectionCollector(
physicalArity,
keyProjection,
valueProjection,
metadataConverters,
upsertMode);
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
}
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
if (keyDeserialization != null) {
keyDeserialization.open(context);
}
valueDeserialization.open(context);
}
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}
@Override
public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
throw new IllegalStateException("A collector is required for deserializing.");
}
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector)
throws Exception {
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(record.value(), collector);
return;
}
// buffer key(s)
if (keyDeserialization != null) {
keyDeserialization.deserialize(record.key(), keyCollector);
}
// project output while emitting values
outputCollector.inputRecord = record;
outputCollector.physicalKeyRows = keyCollector.buffer;
outputCollector.outputCollector = collector;
if (record.value() == null && upsertMode) {
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(record.value(), outputCollector);
}
keyCollector.buffer.clear();
}
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
}
// --------------------------------------------------------------------------------------------
interface MetadataConverter extends Serializable {
Object read(ConsumerRecord<?, ?> record);
}
// --------------------------------------------------------------------------------------------
private static final class BufferingCollector implements Collector<RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final List<RowData> buffer = new ArrayList<>();
@Override
public void collect(RowData record) {
buffer.add(record);
}
@Override
public void close() {
// nothing to do
}
}
// --------------------------------------------------------------------------------------------
/**
* Emits a row with key, value, and metadata fields.
*
* <p>The collector is able to handle the following kinds of keys:
*
* <ul>
* <li>No key is used.
* <li>A key is used.
* <li>The deserialization schema emits multiple keys.
* <li>Keys and values have overlapping fields.
* <li>Keys are used and value is null.
* </ul>
*/
private static final class OutputProjectionCollector
implements Collector<RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final int physicalArity;
private final int[] keyProjection;
private final int[] valueProjection;
private final MetadataConverter[] metadataConverters;
private final boolean upsertMode;
private transient ConsumerRecord<?, ?> inputRecord;
private transient List<RowData> physicalKeyRows;
private transient Collector<RowData> outputCollector;
OutputProjectionCollector(
int physicalArity,
int[] keyProjection,
int[] valueProjection,
MetadataConverter[] metadataConverters,
boolean upsertMode) {
this.physicalArity = physicalArity;
this.keyProjection = keyProjection;
this.valueProjection = valueProjection;
this.metadataConverters = metadataConverters;
this.upsertMode = upsertMode;
}
@Override
public void collect(RowData physicalValueRow) {
// no key defined
if (keyProjection.length == 0) {
emitRow(null, (GenericRowData) physicalValueRow);
return;
}
// otherwise emit a value for each key
for (RowData physicalKeyRow : physicalKeyRows) {
emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
}
}
@Override
public void close() {
// nothing to do
}
private void emitRow(
@Nullable GenericRowData physicalKeyRow,
@Nullable GenericRowData physicalValueRow) {
final RowKind rowKind;
if (physicalValueRow == null) {
if (upsertMode) {
rowKind = RowKind.DELETE;
} else {
throw new DeserializationException(
"Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
}
} else {
rowKind = physicalValueRow.getRowKind();
}
final int metadataArity = metadataConverters.length;
final GenericRowData producedRow =
new GenericRowData(rowKind, physicalArity + metadataArity);
for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
assert physicalKeyRow != null;
producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
}
if (physicalValueRow != null) {
for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
producedRow.setField(
valueProjection[valuePos], physicalValueRow.getField(valuePos));
}
}
for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
producedRow.setField(
physicalArity + metadataPos,
metadataConverters[metadataPos].read(inputRecord));
}
outputCollector.collect(producedRow);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.header.Header;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/** A version-agnostic Kafka {@link ScanTableSource}. */
@Internal
public class KafkaDynamicSource
implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
/** Data type that describes the final output of the source. */
protected DataType producedDataType;
/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
/** Watermark strategy that is used to generate per-partition watermark. */
protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
// --------------------------------------------------------------------------------------------
// Format attributes
// --------------------------------------------------------------------------------------------
private static final String VALUE_METADATA_PREFIX = "value.";
/** Data type to configure the formats. */
protected final DataType physicalDataType;
/** Optional format for decoding keys from Kafka. */
protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
/** Format for decoding values from Kafka. */
protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
/** Indices that determine the key fields and the target position in the produced row. */
protected final int[] keyProjection;
/** Indices that determine the value fields and the target position in the produced row. */
protected final int[] valueProjection;
/** Prefix that needs to be removed from fields when constructing the physical data type. */
protected final @Nullable String keyPrefix;
// --------------------------------------------------------------------------------------------
// Kafka-specific attributes
// --------------------------------------------------------------------------------------------
/** The Kafka topics to consume. */
protected final List<String> topics;
/** The Kafka topic pattern to consume. */
protected final Pattern topicPattern;
/** Properties for the Kafka consumer. */
protected final Properties properties;
/**
* The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}).
*/
protected final StartupMode startupMode;
/**
* Specific startup offsets; only relevant when startup mode is {@link
* StartupMode#SPECIFIC_OFFSETS}.
*/
protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
/**
* The start timestamp to locate partition offsets; only relevant when startup mode is {@link
* StartupMode#TIMESTAMP}.
*/
protected final long startupTimestampMillis;
/** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
protected final boolean upsertMode;
protected final String tableIdentifier;
public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
boolean upsertMode,
String tableIdentifier) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
physicalDataType, "Physical data type must not be null.");
this.keyDecodingFormat = keyDecodingFormat;
this.valueDecodingFormat =
Preconditions.checkNotNull(
valueDecodingFormat, "Value decoding format must not be null.");
this.keyProjection =
Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
this.valueProjection =
Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
this.keyPrefix = keyPrefix;
// Mutable attributes
this.producedDataType = physicalDataType;
this.metadataKeys = Collections.emptyList();
this.watermarkStrategy = null;
// Kafka-specific attributes
Preconditions.checkArgument(
(topics != null && topicPattern == null)
|| (topics == null && topicPattern != null),
"Either Topic or Topic Pattern must be set for source.");
this.topics = topics;
this.topicPattern = topicPattern;
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.startupMode =
Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
this.specificStartupOffsets =
Preconditions.checkNotNull(
specificStartupOffsets, "Specific offsets must not be null.");
this.startupTimestampMillis = startupTimestampMillis;
this.upsertMode = upsertMode;
this.tableIdentifier = tableIdentifier;
}
@Override
public ChangelogMode getChangelogMode() {
return valueDecodingFormat.getChangelogMode();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final DeserializationSchema<RowData> keyDeserialization =
createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
final DeserializationSchema<RowData> valueDeserialization =
createDeserialization(context, valueDecodingFormat, valueProjection, null);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
final KafkaSource<RowData> kafkaSource =
createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
if (watermarkStrategy == null) {
watermarkStrategy = WatermarkStrategy.noWatermarks();
}
return execEnv.fromSource(
kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
}
@Override
public boolean isBounded() {
return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
}
};
}
@Override
public Map<String, DataType> listReadableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
// according to convention, the order of the final row must be
// PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
// where the format metadata has highest precedence
// add value format metadata with prefix
valueDecodingFormat
.listReadableMetadata()
.forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
// add connector metadata
Stream.of(ReadableMetadata.values())
.forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
return metadataMap;
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
// separate connector and format metadata
final List<String> formatMetadataKeys =
metadataKeys.stream()
.filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
.collect(Collectors.toList());
final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
connectorMetadataKeys.removeAll(formatMetadataKeys);
// push down format metadata
final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
if (formatMetadata.size() > 0) {
final List<String> requestedFormatMetadataKeys =
formatMetadataKeys.stream()
.map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
.collect(Collectors.toList());
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
}
this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
public boolean supportsMetadataProjection() {
return false;
}
@Override
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}
@Override
public DynamicTableSource copy() {
final KafkaDynamicSource copy =
new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
upsertMode,
tableIdentifier);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
return copy;
}
@Override
public String asSummaryString() {
return "Kafka table source";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaDynamicSource that = (KafkaDynamicSource) o;
return Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(physicalDataType, that.physicalDataType)
&& Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
&& Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
&& Arrays.equals(keyProjection, that.keyProjection)
&& Arrays.equals(valueProjection, that.valueProjection)
&& Objects.equals(keyPrefix, that.keyPrefix)
&& Objects.equals(topics, that.topics)
&& Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern))
&& Objects.equals(properties, that.properties)
&& startupMode == that.startupMode
&& Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
&& startupTimestampMillis == that.startupTimestampMillis
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(tableIdentifier, that.tableIdentifier)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
}
@Override
public int hashCode() {
return Objects.hash(
producedDataType,
metadataKeys,
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
upsertMode,
tableIdentifier,
watermarkStrategy);
}
// --------------------------------------------------------------------------------------------
protected KafkaSource<RowData> createKafkaSource(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo) {
final KafkaDeserializationSchema<RowData> kafkaDeserializer =
createKafkaDeserializationSchema(
keyDeserialization, valueDeserialization, producedTypeInfo);
final KafkaSourceBuilder<RowData> kafkaSourceBuilder = KafkaSource.builder();
if (topics != null) {
kafkaSourceBuilder.setTopics(topics);
} else {
kafkaSourceBuilder.setTopicPattern(topicPattern);
}
switch (startupMode) {
case EARLIEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
break;
case LATEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break;
case GROUP_OFFSETS:
//kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
String offsetResetConfig =
properties.getProperty(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.NONE.name());
OffsetResetStrategy offsetResetStrategy = getResetStrategy(offsetResetConfig);
kafkaSourceBuilder.setStartingOffsets(
OffsetsInitializer.committedOffsets(offsetResetStrategy));
break;
case SPECIFIC_OFFSETS:
Map<TopicPartition, Long> offsets = new HashMap<>();
specificStartupOffsets.forEach(
(tp, offset) ->
offsets.put(
new TopicPartition(tp.getTopic(), tp.getPartition()),
offset));
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets));
break;
case TIMESTAMP:
kafkaSourceBuilder.setStartingOffsets(
OffsetsInitializer.timestamp(startupTimestampMillis));
break;
}
kafkaSourceBuilder
.setProperties(properties)
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
return kafkaSourceBuilder.build();
}
private KafkaDeserializationSchema<RowData> createKafkaDeserializationSchema(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo) {
final MetadataConverter[] metadataConverters =
metadataKeys.stream()
.map(
k ->
Stream.of(ReadableMetadata.values())
.filter(rm -> rm.key.equals(k))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(m -> m.converter)
.toArray(MetadataConverter[]::new);
// check if connector metadata is used at all
final boolean hasMetadata = metadataKeys.size() > 0;
// adjust physical arity with value format's metadata
final int adjustedPhysicalArity =
producedDataType.getChildren().size() - metadataKeys.size();
// adjust value format projection to include value format's metadata columns at the end
final int[] adjustedValueProjection =
IntStream.concat(
IntStream.of(valueProjection),
IntStream.range(
keyProjection.length + valueProjection.length,
adjustedPhysicalArity))
.toArray();
return new DynamicKafkaDeserializationSchema(
adjustedPhysicalArity,
keyDeserialization,
keyProjection,
valueDeserialization,
adjustedValueProjection,
hasMetadata,
metadataConverters,
producedTypeInfo,
upsertMode);
}
private @Nullable DeserializationSchema<RowData> createDeserialization(
Context context,
@Nullable DecodingFormat<DeserializationSchema<RowData>> format,
int[] projection,
@Nullable String prefix) {
if (format == null) {
return null;
}
DataType physicalFormatDataType =
DataTypeUtils.projectRow(this.physicalDataType, projection);
if (prefix != null) {
physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
}
return format.createRuntimeDecoder(context, physicalFormatDataType);
}
// --------------------------------------------------------------------------------------------
// Metadata handling
// --------------------------------------------------------------------------------------------
enum ReadableMetadata {
TOPIC(
"topic",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return StringData.fromString(record.topic());
}
}),
PARTITION(
"partition",
DataTypes.INT().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return record.partition();
}
}),
HEADERS(
"headers",
// key and value of the map are nullable to make handling easier in queries
DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
.notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
final Map<StringData, byte[]> map = new HashMap<>();
for (Header header : record.headers()) {
map.put(StringData.fromString(header.key()), header.value());
}
return new GenericMapData(map);
}
}),
LEADER_EPOCH(
"leader-epoch",
DataTypes.INT().nullable(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return record.leaderEpoch().orElse(null);
}
}),
OFFSET(
"offset",
DataTypes.BIGINT().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return record.offset();
}
}),
TIMESTAMP(
"timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return TimestampData.fromEpochMillis(record.timestamp());
}
}),
TIMESTAMP_TYPE(
"timestamp-type",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return StringData.fromString(record.timestampType().toString());
}
});
final String key;
final DataType dataType;
final MetadataConverter converter;
ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
}
private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
return Arrays.stream(OffsetResetStrategy.values())
.filter(ors -> ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
.findAny()
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"%s can not be set to %s. Valid values: [%s]",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
offsetResetConfig,
Arrays.stream(OffsetResetStrategy.values())
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.joining(",")))));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment