Неизвестная ошибка Flume Agent

#hadoop #hbase #flume

#hadoop #hbase #Поток

Вопрос:

Я получаю следующую ошибку в моем flume agent, где я использую AsyncHbaseEventSerializer. Я сомневаюсь, что это из-за моей схемы. У меня есть два семейства столбцов.

 MyAgent.sinks.MySink.columnFamily=family1,family2
 

Когда я указываю, что оба семейства столбцов разделены, я получаю сообщение об ошибке, поскольку семейство таблиц / столбцов не найдено.

 MyAgent.sinks.MySink.columnFamily=family1 family2
 

Если я укажу оба семейства столбцов, разделенных пробелом, то я думаю, что учитывается только первое семейство столбцов.

Это ошибка. Кто-нибудь может мне помочь?

 Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Could not write events to Hbase. Transaction failed, and rolled back.
    at org.apache.flume.sink.hbase.AsyncHBaseSink.process(AsyncHBaseSink.java:317)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
 

Обновить:

 myAgent.sources = TwitterSource InstagramSource FacebookSource
myAgent.channels = TwitterChannel InstagramChannelMedia InstagramChannelMediaComment FacebookChannelPage FacebookChannelPost FacebookChannelComment
myAgent.sinks = TwitterSink InstagramSinkMedia InstagramSinkMediaComment FacebookSinkPage FacebookSinkPost FacebookSinkComment

myAgent.sources.TwitterSource.type = com.my.socialanalytics.core.twitter.TwitterSource
myAgent.sources.TwitterSource.channels = TwitterChannel
myAgent.sources.TwitterSource.consumerKey = <>
myAgent.sources.TwitterSource.consumerSecret = <>
myAgent.sources.TwitterSource.accessToken = <>
myAgent.sources.TwitterSource.accessTokenSecret = <>
myAgent.sources.TwitterSource.crawlingFrequency = 300000
myAgent.sources.TwitterSource.mongoHost =10.3.0.38
myAgent.sources.TwitterSource.mongoPort =27017
myAgent.sources.TwitterSource.mongoDBName =mySocialAnalytics
myAgent.sources.TwitterSource.mongoCollectionCampaigns =Campaigns
myAgent.sources.TwitterSource.mongoUsername =admin
myAgent.sources.TwitterSource.mongoPassword =qburst
myAgent.sources.TwitterSource.mongoAuthDB =admin

myAgent.sinks.TwitterSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
myAgent.sinks.TwitterSink.channel=TwitterChannel
myAgent.sinks.TwitterSink.table=Tweets
myAgent.sinks.TwitterSink.columnFamily=campaign tweet
myAgent.sinks.TwitterSink.serializer=com.my.socialanalytics.core.twitter.TwitterSerializer
myAgent.sinks.TwitterSink.serializer.columns=tweet:contributors tweet:createdAt tweet:inReplyToUserID tweet:text tweet:inReplyToStatusID tweet:source tweet:lang tweet:geo tweet:favorited tweet:withheldCopyright campaign:stateCode tweet:truncated tweet:entities tweet:inReplyToScreenName tweet:withheldInCountries campaign:campaignID tweet:favoriteCount tweet:id tweet:user tweet:retweetedStatus campaign:countryCode tweet:possiblySensitive tweet:currentUserRetweet tweet:retweetCount tweet:withheldScope tweet:retweeted tweet:coordinates tweet:filterLevel tweet:quotedStatusID campaign:sentiment tweet:place
myAgent.sinks.TwitterSink.batchSize = 50

myAgent.channels.TwitterChannel.type = memory
myAgent.channels.TwitterChannel.capacity = 10000000
myAgent.channels.TwitterChannel.transactionCapacity = 100000

myAgent.sources.InstagramSource.type = com.my.socialanalytics.core.instagram.InstagramSource
myAgent.sources.InstagramSource.channels = InstagramChannelMedia InstagramChannelMediaComment
myAgent.sources.InstagramSource.clientID = <>
myAgent.sources.InstagramSource.clientSecret = <>
myAgent.sources.InstagramSource.accessToken = <>
myAgent.sources.InstagramSource.crawlingFrequency = 3600000
myAgent.sources.InstagramSource.mongoHost =10.3.0.38
myAgent.sources.InstagramSource.mongoPort =27017
myAgent.sources.InstagramSource.mongoDBName =mySocialAnalytics
myAgent.sources.InstagramSource.mongoCollectionCampaigns =Campaigns
myAgent.sources.InstagramSource.mongoUsername =admin
myAgent.sources.InstagramSource.mongoPassword =qburst
myAgent.sources.InstagramSource.mongoAuthDB =admin
myAgent.sources.InstagramSource.selector.type = multiplexing
myAgent.sources.InstagramSource.selector.header = feedType
myAgent.sources.InstagramSource.selector.mapping.feedTypeMedia = InstagramChannelMedia
myAgent.sources.InstagramSource.selector.mapping.feedTypeComment = InstagramChannelMediaComment
myAgent.sources.InstagramSource.selector.default = InstagramChannelMedia

myAgent.sinks.InstagramSinkMedia.type=org.apache.flume.sink.hbase.AsyncHBaseSink
myAgent.sinks.InstagramSinkMedia.channel=InstagramChannelMedia
myAgent.sinks.InstagramSinkMedia.table=InstagramMedia
myAgent.sinks.InstagramSinkMedia.columnFamily=campaign media
myAgent.sinks.InstagramSinkMedia.serializer=com.my.socialanalytics.core.instagram.InstagramMediaSerializer
myAgent.sinks.InstagramSinkMedia.serializer.columns=media:createdTime campaign:campaignID media:link media:videos media:type media:caption campaign:countryCode media:filter media:likes media:attribution media:comments media:user campaign:updatedAt media:tags campaign:stateCode media:images campaign:sentiment media:location media:id
myAgent.sinks.InstagramSinkMedia.batchSize = 50

myAgent.sinks.InstagramSinkMediaComment.type=org.apache.flume.sink.hbase.AsyncHBaseSink
myAgent.sinks.InstagramSinkMediaComment.channel=InstagramChannelMediaComment
myAgent.sinks.InstagramSinkMediaComment.table=InstagramComments
myAgent.sinks.InstagramSinkMediaComment.columnFamily=campaign comment
myAgent.sinks.InstagramSinkMediaComment.serializer=com.my.socialanalytics.core.instagram.InstagramMediaCommentSerializer
myAgent.sinks.InstagramSinkMediaComment.serializer.columns=comment:mediaId comment:id campaign:campaignID comment:createdTime campaign:updatedAt comment:from comment:text campaign:sentiment
myAgent.sinks.InstagramSinkMediaComment.batchSize = 50

myAgent.channels.InstagramChannelMedia.type = memory
myAgent.channels.InstagramChannelMedia.capacity = 10000000
myAgent.channels.InstagramChannelMedia.transactionCapacity = 100000

myAgent.channels.InstagramChannelMediaComment.type = memory
myAgent.channels.InstagramChannelMediaComment.capacity = 10000000
myAgent.channels.InstagramChannelMediaComment.transactionCapacity = 100000

myAgent.sources.FacebookSource.type = com.my.socialanalytics.core.facebook.FacebookSource
myAgent.sources.FacebookSource.channels = FacebookChannelPage FacebookChannelPost FacebookChannelComment
myAgent.sources.FacebookSource.accessToken = <>
myAgent.sources.FacebookSource.crawlingFrequency = 3600000
myAgent.sources.FacebookSource.mongoHost =10.3.0.38
myAgent.sources.FacebookSource.mongoPort =27017
myAgent.sources.FacebookSource.mongoDBName =mySocialAnalytics
myAgent.sources.FacebookSource.mongoCollectionCampaigns =Campaigns
myAgent.sources.FacebookSource.mongoUsername =admin
myAgent.sources.FacebookSource.mongoPassword =qburst
myAgent.sources.FacebookSource.mongoAuthDB =admin
myAgent.sources.FacebookSource.selector.type = multiplexing
myAgent.sources.FacebookSource.selector.header = feedType
myAgent.sources.FacebookSource.selector.mapping.feedTypePage = FacebookChannelPage
myAgent.sources.FacebookSource.selector.mapping.feedTypePost = FacebookChannelPost
myAgent.sources.FacebookSource.selector.mapping.feedTypeComment = FacebookChannelComment
myAgent.sources.FacebookSource.selector.default = FacebookChannelPage

myAgent.sinks.FacebookSinkPage.type=org.apache.flume.sink.hbase.AsyncHBaseSink
myAgent.sinks.FacebookSinkPage.channel=FacebookChannelPage
myAgent.sinks.FacebookSinkPage.table=FacebookPages
myAgent.sinks.FacebookSinkPage.columnFamily=campaign page
myAgent.sinks.FacebookSinkPage.serializer=com.my.socialanalytics.core.facebook.FacebookPageSerializer
myAgent.sinks.FacebookSinkPage.serializer.columns=page:talkingAboutCount campaign:campaignID page:name page:wereHereCount page:fanCount page:id page:checkins campaign:day
myAgent.sinks.FacebookSinkPage.batchSize = 50

myAgent.sinks.FacebookSinkPost.type=org.apache.flume.sink.hbase.AsyncHBaseSink
myAgent.sinks.FacebookSinkPost.channel=FacebookChannelPost
myAgent.sinks.FacebookSinkPost.table=FacebookPosts
myAgent.sinks.FacebookSinkPost.columnFamily=post campaign
myAgent.sinks.FacebookSinkPost.serializer=com.my.socialanalytics.core.facebook.FacebookPostSerializer
myAgent.sinks.FacebookSinkPost.serializer.columns=post:link campaign:campaignID post:place post:sharesCount post:name post:from campaign:updatedAt post:caption post:id post:likesCount post:createdTime post:message campaign:sentiment
myAgent.sinks.FacebookSinkPost.batchSize = 50

myAgent.sinks.FacebookSinkComment.type=org.apache.flume.sink.hbase.AsyncHBaseSink
myAgent.sinks.FacebookSinkComment.channel=FacebookChannelComment
myAgent.sinks.FacebookSinkComment.table=FacebookComments
myAgent.sinks.FacebookSinkComment.columnFamily=campaign comment
myAgent.sinks.FacebookSinkComment.serializer=com.my.socialanalytics.core.facebook.FacebookCommentSerializer
myAgent.sinks.FacebookSinkComment.serializer.columns=comment:message comment:id campaign:campaignID comment:commentCount comment:parent comment:createdTime campaign:updatedAt comment:from comment:postID campaign:sentiment
myAgent.sinks.FacebookSinkComment.batchSize = 50

myAgent.channels.FacebookChannelPage.type = memory
myAgent.channels.FacebookChannelPage.capacity = 10000000
myAgent.channels.FacebookChannelPage.transactionCapacity = 100000

myAgent.channels.FacebookChannelPost.type = memory
myAgent.channels.FacebookChannelPost.capacity = 10000000
myAgent.channels.FacebookChannelPost.transactionCapacity = 100000

myAgent.channels.FacebookChannelComment.type = memory
myAgent.channels.FacebookChannelComment.capacity = 10000000
myAgent.channels.FacebookChannelComment.transactionCapacity = 100000
 

Изначально я не получаю никакой ошибки, но через пару часов получаю такую ошибку. Я запускаю несколько заданий spark для обновления определенного поля в той же таблице. Я не уверен, что это причина. Мой канал часто обновляет одни и те же записи. Это причина этой ошибки?

Обновление 2: если я упоминаю оба семейства столбцов, я получаю ошибку, как показано ниже.

    Oct 14, 4:21:11.040 PM   ERROR   org.apache.flume.lifecycle.LifecycleSupervisor  
    Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4c74990e counterGroup:{ name:null counters:{} } } - Exception follows.
    org.apache.flume.FlumeException: Could not start sink. Table or column family does not exist in Hbase.
        at 

org.apache.flume.sink.hbase.AsyncHBaseSink.initHBaseClient(AsyncHBaseSink.java:489)
    at org.apache.flume.sink.hbase.AsyncHBaseSink.start(AsyncHBaseSink.java:441)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
 

Если я упомяну только одно семейство столбцов, я получаю ошибку

 Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Could not write events to Hbase. Transaction failed, and rolled back.
    at org.apache.flume.sink.hbase.AsyncHBaseSink.process(AsyncHBaseSink.java:317)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
 

Комментарии:

1. Привет, Вайсах, ты можешь опубликовать весь файл конфигурации приемника потока ?.

2. @Vijay_Shinde Спасибо за ответ. Обновленный вопрос с конфигурацией

3. Похоже, ваша конфигурация верна. Но все же нужно больше об исключении. Не могли бы вы, пожалуйста, обновить всю трассировку стека вашего исключения?

4. @Vijay_Shinde обновил вопрос. Не можем ли мы указать два семейства столбцов для синхронизации?

5. Я пробовал с одним семейством столбцов и его работой. Можете ли вы сначала попробовать с семейством с одним столбцом.