flink-sql入es报错:Missing required options are document-type
完整的报错
Exception in thread “main” org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table ‘default_catalog.default_database.sink_ella_operation_log’.
Table options are:
‘connector’=‘elasticsearch-6’
‘hosts’=‘http://bigdatanode01:9200;http://bigdatanode02:9200;http://bigdatanode03:9200’
‘index’=‘ella_operation_log’
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
at org.apache.flink.table.planner.delegation.PlannerBaseKaTeX parse error: Can't use function '$' in math mode at position 8: anonfun$̲1.apply(Planner…anonfun 1. a p p l y ( P l a n n e r B a s e . s c a l a : 162 ) a t s c a l a . c o l l e c t i o n . T r a v e r s a b l e L i k e 1.apply(PlannerBase.scala:162) at scala.collection.TraversableLike 1.apply(PlannerBase.scala:162)atscala.collection.TraversableLike a n o n f u n anonfun anonfunmap 1. a p p l y ( T r a v e r s a b l e L i k e . s c a l a : 234 ) a t s c a l a . c o l l e c t i o n . T r a v e r s a b l e L i k e 1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike 1.apply(TraversableLike.scala:234)atscala.collection.TraversableLike a n o n f u n anonfun anonfunmap 1. a p p l y ( T r a v e r s a b l e L i k e . s c a l a : 234 ) a t s c a l a . c o l l e c t i o n . I t e r a t o r 1.apply(TraversableLike.scala:234) at scala.collection.Iterator 1.apply(TraversableLike.scala:234)atscala.collection.Iteratorclass.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike c l a s s . f o r e a c h ( I t e r a b l e L i k e . s c a l a : 72 ) a t s c a l a . c o l l e c t i o n . A b s t r a c t I t e r a b l e . f o r e a c h ( I t e r a b l e . s c a l a : 54 ) a t s c a l a . c o l l e c t i o n . T r a v e r s a b l e L i k e class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike class.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableLikeclass.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at com.medbook.foreign.mysql.ella.SyncEllaOperationToES.sink(SyncEllaOperationToES.scala:65)
at com.medbook.foreign.mysql.ella.Exec . m a i n ( E x e c . s c a l a : 6 ) a t c o m . m e d b o o k . f o r e i g n . m y s q l . e l l a . E x e c . m a i n ( E x e c . s c a l a ) C a u s e d b y : o r g . a p a c h e . f l i n k . t a b l e . a p i . V a l i d a t i o n E x c e p t i o n : O n e o r m o r e r e q u i r e d o p t i o n s a r e m i s s i n g . M i s s i n g r e q u i r e d o p t i o n s a r e : d o c u m e n t − t y p e a t o r g . a p a c h e . f l i n k . t a b l e . f a c t o r i e s . F a c t o r y U t i l . v a l i d a t e F a c t o r y O p t i o n s ( F a c t o r y U t i l . j a v a : 381 ) a t o r g . a p a c h e . f l i n k . t a b l e . f a c t o r i e s . F a c t o r y U t i l . v a l i d a t e F a c t o r y O p t i o n s ( F a c t o r y U t i l . j a v a : 354 ) a t o r g . a p a c h e . f l i n k . t a b l e . f a c t o r i e s . F a c t o r y U t i l .main(Exec.scala:6) at com.medbook.foreign.mysql.ella.Exec.main(Exec.scala) Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing. Missing required options are: document-type at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:381) at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:354) at org.apache.flink.table.factories.FactoryUtil .main(Exec.scala:6)atcom.medbook.foreign.mysql.ella.Exec.main(Exec.scala)Causedby:org.apache.flink.table.api.ValidationException:Oneormorerequiredoptionsaremissing.Missingrequiredoptionsare:document−typeatorg.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:381)atorg.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:354)atorg.apache.flink.table.factories.FactoryUtilTableFactoryHelper.validate(FactoryUtil.java:712)
at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory.createDynamicTableSink(Elasticsearch6DynamicSinkFactory.java:92)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
… 20 more
如下图所示提取有用的信息
分析错误原因
我是在flink-sql创建es表的时候报的错,报错提示缺少对应的options,及document-type
,我连忙去flink官方文档查找答案:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/#document-type
从官方文档给出的连接参数可以看出document-type在es6中需要指定,而在es7中无需指定,原因大概是6.0版本及以前一个索引里可以创建多个类型(type),7.0版本开始废弃type,一般使用_doc代替了, 8.0版本会彻底废弃。
我这里使用的是6.x版本的es,所以自然要配置好document-type这个option。
文章来源: blog.csdn.net,作者:橙子园,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/Chenftli/article/details/125105275
- 点赞
- 收藏
- 关注作者
评论(0)