基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】 全球今日讯
发布时间:2023-04-12 15:29:15 来源:博客园


(资料图)

基于chunjun纯钧的增量数据同步

目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步

chunjun的官网文档对增量同步已经做出了一定的说明

纯钧官方根据文档我编写了一个SQL脚本

create table `source` (        `sfzh` STRING COMMENT "",        `xm` STRING COMMENT "",        `xb` STRING COMMENT "",        `xbdm` STRING COMMENT "",        `jzdz` STRING COMMENT "",        `fzrq` DATE COMMENT "",        `dsc_biz_record_id` STRING COMMENT "") with (        "connector" = "mysql-x",        "url" = "jdbc:mysql://:/?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true",        "table-name" = "",        "username" = "",        "password" = "",        "scan.fetch-size" = "1024",        "scan.increment.column" = "fzrq",        --"scan.increment.column-type" = "date",        "scan.start-location" = "1659974400000");create table `sink` (        `sfzh` STRING COMMENT "",        `xm` STRING COMMENT "",        `xb` STRING COMMENT "",        `xbdm` STRING COMMENT "",        `jzdz` STRING COMMENT "",        `fzrq` DATE COMMENT "",        `dsc_biz_record_id` STRING COMMENT "",        PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED) with (        "connector" = "stream-x");

然后提交任务的时候发现已经记录了start-locationstart-location的指标信息了,但是并没有上报到Prometheus!

在本地调试源码解决问题的大致过程

在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量

/** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */protected transient CustomReporter customReporter;

该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法

@Override    public void openInputFormat() throws IOException {        Map vars = getRuntimeContext().getMetricGroup().getAllVariables();        if (vars != null) {            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");            jobId = vars.get(Metrics.JOB_NAME);            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));        }        LOG.info("是否使用自定义报告 {}", useCustomReporter());        if (useCustomReporter()) {            customReporter =                    DataSyncFactoryUtil.discoverMetric(                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());            customReporter.open();            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());        }        startTime = System.currentTimeMillis();    }

通过排查useCustomReporter方法得知 jdbcConf.getInitReporter()是false,而在JdbcConfig类里面这个对象默认是true

/** 使用自定义的指标输出器把增量指标打到普罗米修斯 */    @Override    protected boolean useCustomReporter() {        return jdbcConf.isIncrement() && jdbcConf.getInitReporter();    }    /** 增量同步或者间隔轮询时,是否初始化外部存储 */    protected Boolean initReporter = true;

经过查找 initReporter 属性的set方法调用,找到了下面的问题在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests

后续问题

打包到虚拟机进行测试,我使用的是yarn-per-job模式,提交任务后发现报找不到Prometheus报告类的异常,通过异常信息发现在前面提到的方法里有classloader

public void openInputFormat() throws IOException {        Map vars = getRuntimeContext().getMetricGroup().getAllVariables();        if (vars != null) {            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");            jobId = vars.get(Metrics.JOB_NAME);            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));        }        LOG.info("是否使用自定义报告 {}", useCustomReporter());        if (useCustomReporter()) {            customReporter =                    DataSyncFactoryUtil.discoverMetric(                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());            customReporter.open();            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());        }        startTime = System.currentTimeMillis();    }    public static CustomReporter discoverMetric(            ChunJunCommonConf commonConf,            RuntimeContext context,            boolean makeTaskFailedWhenReportFailed) {        try {            String pluginName = commonConf.getMetricPluginName();            // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport            String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric);            MetricParam metricParam =                    new MetricParam(                            context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps());            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();            Class clazz = classLoader.loadClass(pluginClassName);            Constructor constructor = clazz.getConstructor(MetricParam.class);            return (CustomReporter) constructor.newInstance(metricParam);        } catch (Exception e) {            throw new ChunJunRuntimeException(e);        }    }

在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!

标签: