(资料图)
chunjun的官网文档对增量同步已经做出了一定的说明目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步
纯钧官方根据文档我编写了一个SQL脚本
create table `source` ( `sfzh` STRING COMMENT "", `xm` STRING COMMENT "", `xb` STRING COMMENT "", `xbdm` STRING COMMENT "", `jzdz` STRING COMMENT "", `fzrq` DATE COMMENT "", `dsc_biz_record_id` STRING COMMENT "") with ( "connector" = "mysql-x", "url" = "jdbc:mysql://:/?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true", "table-name" = "", "username" = "", "password" = "", "scan.fetch-size" = "1024", "scan.increment.column" = "fzrq", --"scan.increment.column-type" = "date", "scan.start-location" = "1659974400000");create table `sink` ( `sfzh` STRING COMMENT "", `xm` STRING COMMENT "", `xb` STRING COMMENT "", `xbdm` STRING COMMENT "", `jzdz` STRING COMMENT "", `fzrq` DATE COMMENT "", `dsc_biz_record_id` STRING COMMENT "", PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED) with ( "connector" = "stream-x");然后提交任务的时候发现已经记录了start-location和 start-location的指标信息了,但是并没有上报到Prometheus!
在本地调试源码解决问题的大致过程在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量
/** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */protected transient CustomReporter customReporter;该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法
@Override public void openInputFormat() throws IOException { Map vars = getRuntimeContext().getMetricGroup().getAllVariables(); if (vars != null) { jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName"); jobId = vars.get(Metrics.JOB_NAME); indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX)); } LOG.info("是否使用自定义报告 {}", useCustomReporter()); if (useCustomReporter()) { customReporter = DataSyncFactoryUtil.discoverMetric( config, getRuntimeContext(), makeTaskFailedWhenReportFailed()); customReporter.open(); LOG.info("customReporter 的hashcode is {}", customReporter.hashCode()); } startTime = System.currentTimeMillis(); } 通过排查useCustomReporter方法得知 jdbcConf.getInitReporter()是false,而在JdbcConfig类里面这个对象默认是true
/** 使用自定义的指标输出器把增量指标打到普罗米修斯 */ @Override protected boolean useCustomReporter() { return jdbcConf.isIncrement() && jdbcConf.getInitReporter(); } /** 增量同步或者间隔轮询时,是否初始化外部存储 */ protected Boolean initReporter = true;经过查找 initReporter 属性的set方法调用,找到了下面的问题在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests
后续问题打包到虚拟机进行测试,我使用的是yarn-per-job模式,提交任务后发现报找不到Prometheus报告类的异常,通过异常信息发现在前面提到的方法里有classloader
public void openInputFormat() throws IOException { Map vars = getRuntimeContext().getMetricGroup().getAllVariables(); if (vars != null) { jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName"); jobId = vars.get(Metrics.JOB_NAME); indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX)); } LOG.info("是否使用自定义报告 {}", useCustomReporter()); if (useCustomReporter()) { customReporter = DataSyncFactoryUtil.discoverMetric( config, getRuntimeContext(), makeTaskFailedWhenReportFailed()); customReporter.open(); LOG.info("customReporter 的hashcode is {}", customReporter.hashCode()); } startTime = System.currentTimeMillis(); } public static CustomReporter discoverMetric( ChunJunCommonConf commonConf, RuntimeContext context, boolean makeTaskFailedWhenReportFailed) { try { String pluginName = commonConf.getMetricPluginName(); // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric); MetricParam metricParam = new MetricParam( context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps()); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class> clazz = classLoader.loadClass(pluginClassName); Constructor> constructor = clazz.getConstructor(MetricParam.class); return (CustomReporter) constructor.newInstance(metricParam); } catch (Exception e) { throw new ChunJunRuntimeException(e); } } 在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!
-
基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】 全球今日讯基于chunjun纯钧的增量数据同步目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程 -
最资讯丨品味岁月陈酿 国台即将启动老酒置换计划4月11日,国台国标(2012-2018年酿造)真实年份老酒鉴赏会在成都召开。我们从不同酿造年份的国台国标,可以品鉴到时间的味道,探索到时间的秘密 -
世界快资讯丨C常青4月12日快速上涨以下是C常青在北京时间4月12日13:41分盘口异动快照:4月12日,C常青盘中快速上涨,5分钟内涨幅超过2%,截至13点41分,报37元,成交3 87亿元, -
投资30亿,长乐一大型商旅综合体备案报建!投资30亿,长乐一大型商旅综合体备案报建! -
熵基科技:4月11日融资买入1135.82万元,融资融券余额1.48亿元4月11日,熵基科技(301330)融资买入1135 82万元,融资偿还1407 93万元,融资净卖出272 11万元,融资余额1 48亿元,近20个交易日中有11个交易 -
北京发布沙尘蓝色预警信号【北京发布沙尘蓝色预警信号】据气象北京微博消息,北京市气象台2023年4月12日9时55分发布沙尘蓝色预警信号:受偏南风影响,北京南部地区已出 -
往期回顾:泰国登革热致11人死亡 登革热死亡率_天天观天下一、泰国登革热致11人死亡4月10日,泰国公共卫生部就该国登革热情况发布报告称,2023年1月1日至4月5日期间,泰国登革热确诊病例达到1 439万例 -
快看点丨一季度中欧班列开行4186列 承运货物日益丰富央视网消息(新闻联播):国铁集团最新数据显示,一季度,中欧班列共开行4186列,运送44 9万标箱,同比分别增长15%和28%,承运货物日益丰富。 -
与信心一起前行_关于自信的作文600字相关作文 中考作文素材抄写作文网小编为大家提供与信心一起前行_关于自信的作文600字相关作文中考作文素材来供大家参考,欢迎阅读。与信心一起前行_关于自信的作文60 -
加拿大警方查获87支从美国走私入境的枪支 世界热推荐加拿大多伦多警方当地时间4月11日宣布,近日,他们从来自美国的车辆中缴获87支枪支,同时还缴获了45个超容量弹匣、1454发子弹等。警方表示,缴 -
电力体制改革板块4月11日跌0.22%,通宝能源领跌,主力资金净流出4765.68万元 全球快资讯4月11日电力体制改革板块较上一交易日下跌0 22%,通宝能源领跌。当日上证指数报收于3313 57,下跌0 05%。深证成指报收于11877 15,上涨0 04%。 -
南宁:美人蕉花开那考河 南湖花海等你来-全球微资讯那考河美人蕉花开。记者宋延康摄南湖边,市民开心合影。南宁云—南宁晚报讯(记者叶祯)4月的春风,争相绽放的花卉把邕城打扮成一个美丽的... -
精彩看点:拓尔思:公司具有众多国家标杆项目成功经验以及本土化服务能力 可以平滑替代ES拓尔思(300229)04月11日在投资者互动平台表示:尊敬的投资者:您好!拓尔思的“海贝搜索数据库”(以下简称“海贝”)是一款从内核到系统... -
招商蛇口回复问询函:收购南油集团、招商前海实业股权具有必要性 全球观热点本次交易完成后,招商蛇口对招商前海实业的实际权益份额增加24%,合计享有的招商前海实业实际权益份额85 99%。 -
永鑫保险销售主动注销基金销售牌照背后:两家公司撑起7年公募代销生意-天天信息4月10日,上海证监局披露了注销永鑫保险销售服务有限公司公募证券投资基金销售业务许可证的公告,系公司主动提交注销公募证券投资基金销售业务 -
甘州区北街街道流泉社区开展“免费孕前优生优育健康检查”宣传活动甘州融媒讯为使广大居民群众充分认识并积极参与免费婚前及孕前优生健康检查,近日,甘州区北街街道流泉社区工作人员、志愿者在辖区内持续开展 -
天天观速讯丨中汽协:一季度新能源汽车销量158.6万辆,市占率超26%新京报贝壳财经讯(记者王琳琳)4月11日,中国汽车工业协会(以下简称“中汽协”)发布3月汽车产销数据,3月新能源汽车延续良好发展态势,产销... -
环球精选!高校千万师生同上一堂国家安全教育课视频观看时间+方式+内容2023年高校千万师生同上一堂国家安全教育课视频直播观看直播观看时间:2023年4月14日15:00观看方式:师生可扫描中国大学生在线官网、视频号、 -
华峰化学(002064)4月11日主力资金净买入1601.36万元截至2023年4月11日收盘,华峰化学(002064)报收于7 53元,上涨2 31%,换手率0 43%,成交量21 05万手,成交额1 58亿元。 -
等了18年,茶花板块这个“隐秘神盘”的二期终于要动了?金牛区中环路一品天下大街段南侧,有一宗长满杂草的空地静静躺了许久,与街对面热闹的西宸天街形成了强烈的反差感。文 成都商报-红星新闻记者 -
玲珑轮胎:公司盈利还会延续 未来不排除有进一步产品提价的可能-当前看点据第一财经,玲珑轮胎发布业绩预告。预计2023年第一季度实现归属于上市公司股东的净利润2 0亿元到2 2亿元。公司表示, -
开车犯困太吓人!这几招让你提神醒脑 全球播报随着天气逐渐转暖“春困”现象也日渐显著闲坐时打瞌睡不要紧但若是在开车时犯起困来可就危险了在我国的《道路交通安全法》中,明确规定了当... -
天天简讯:尚品生物“飞检”不达标被责令停产整改 生产的化妆品曾被检出禁用原料中国网财经4月11日讯(记者顾凡)昨日,国家药监局发布“关于广州市西美生物科技有限公司等企业飞行检查结果”的通告。 -
世界资讯:大模型技术的军事应用无论是ChatGPT还是之前流行的AI作画等应用,其底层主要依靠的都是大模型技术。 -
云南推进非遗博览园项目选址工作来自云南省文化和旅游厅的信息显示,今年,云南省将推进云南非遗博览园(含云南非遗展示馆)项目选址工作,提高全省非遗系统性保护水平。云南 -
赛龙夺锦·一线行动 | 让数据资源可流转快流通 速看什么是数据经纪人呢?他们到底能起到什么作用?对于这一新事物,近日,记者走进首批持牌数据经纪人公司广金征信,揭开数据经纪人的神秘面纱。 -
填平校园“培训贷”陷阱需要综合施策 视点现实中,大学生陷入校园“培训贷”陷阱并非个例。一些培训机构与小额贷款平台利用大学生渴求掌握相关技能挣快钱的心理,以“先学后付”“免... -
是指什么意思_空头陷阱是什么意思1、空头陷阱的意思是:机构与庄家制作假的K线图与假的买卖单 技术指标等。2、让你买进股票而达到他出货与进货的目的。3、这 -
环球资讯:现场 | 新世界林浩文:大家都在变 不会墨守成规观点网香港报道自疫情爆发以来,在家工作以及持续封关令企业对香港写字楼租赁需求放缓,加上港府近年积极推出商业地,处于持续的供过于求下, -
每日热文:夺欧冠有多难?瓜帅:乔丹15个赛季拿了6个冠军,输的比赢的多夺欧冠有多难?瓜帅:乔丹15个赛季拿了6个冠军,输的比赢的多,欧冠,拜仁,瓜帅,男子篮球,英国足球,西班牙足球,迈克尔-乔丹,佩普·瓜迪奥拉,乔丹



