设为首页 收藏本站
查看: 1296|回复: 0

[经验分享] logstash升级kafka插件

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2016-6-21 08:56:08 | 显示全部楼层 |阅读模式
Logstash 2.x版本kafka升级

    V1

前言

Logstash 2.x版本output-kafka插件只支持kafka-0.8.x版本。但是工作中我们可能用到0.9.x版本的kafka。故而需要升级Logstash-output-kafka插件至3.x版本。

    安装依赖包

yum -y install ruby rubygems ruby-devel
gem sources --add https://ruby.taobao.org/ --remove http://rubygems.org/
gem install jar-dependencies -v '0.3.4'
gem install ruby-maven -v '3.3.11'

    升级output-kafka

/usr/local/logstash/bin/logstash-plugin update logstash-output-kafka

    启动logstash 有如下警告信息

./logstash -f /usr/local/logstash/conf/kafka.conf
Settings: Default pipeline workers: 8
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Pipeline main started

解决办法

参考网站

1.切换到/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/目录下

cd /usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/

2.备份kafka.rb文件

mv kafka.rb{,.backup}

3.新建kafka.rb文件内容如下:

require 'logstash/namespace'
require 'logstash/outputs/base'
require 'jruby-kafka'

# Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on
# the broker.
#
# The only required configuration is the topic name. The default codec is json,
# so events will be persisted on the broker in json format. If you select a codec of plain,
# Logstash will encode your messages with not only the message but also with a timestamp and
# hostname. If you do not want anything but your message passing through, you should make the output
# configuration something like:
# [source,ruby]
#     output {
#       kafka {
#         codec => plain {
#            format => "%{message}"
#         }
#       }
#     }
# For more information see http://kafka.apache.org/documentation.html#theproducer
#
# Kafka producer configuration: http://kafka.apache.org/documentation.html#newproducerconfigs
class LogStash::Outputs::Kafka < LogStash::Outputs::Base
  config_name 'kafka'

  default :codec, 'json'

  # The topic to produce messages to
  config :topic_id, :validate => :string, :required => true
  # This is for bootstrapping and the producer will only use it for getting metadata (topics,
  # partitions and replicas). The socket connections for sending the actual data will be
  # established based on the broker information returned in the metadata. The format is
  # `host1:port1,host2:port2`, and the list can be a subset of brokers or a VIP pointing to a
  # subset of brokers.
  config :bootstrap_servers, :validate => :string, :default => 'localhost:9092'
  # Serializer class for the key of the message
  config :key_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
  # Serializer class for the value of the message
  config :value_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
  # The key that will be included with the record
  #
  # If a `message_key` is present, a partition will be chosen using a hash of the key.
  # If not present, a partition for the message will be assigned in a round-robin fashion.
  config :message_key, :validate => :string
  # The number of acknowledgments the producer requires the leader to have received
  # before considering a request complete.
  #
  # acks=0,   the producer will not wait for any acknowledgment from the server at all.
  # acks=1,   This will mean the leader will write the record to its local log but
  #           will respond without awaiting full acknowledgement from all followers.
  # acks=all, This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
  config :acks, :validate => ["0", "1", "all"], :default => "1"
  # The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
  config :buffer_memory, :validate => :number, :default => 33554432
  # The compression type for all data generated by the producer.
  # The default is none (i.e. no compression). Valid values are none, gzip, or snappy.
  config :compression_type, :validate => ["none", "gzip", "snappy"], :default => "none"
  # Setting a value greater than zero will cause the client to
  # resend any record whose send fails with a potentially transient error.
  config :retries, :validate => :number, :default => 0
  # The producer will attempt to batch records together into fewer requests whenever multiple
  # records are being sent to the same partition. This helps performance on both the client
  # and the server. This configuration controls the default batch size in bytes.
  config :batch_size, :validate => :number, :default => 16384
  # The id string to pass to the server when making requests.
  # The purpose of this is to be able to track the source of requests beyond just
  # ip/port by allowing a logical application name to be included with the request
  config :client_id, :validate => :string
  # The producer groups together any records that arrive in between request
  # transmissions into a single batched request. Normally this occurs only under
  # load when records arrive faster than they can be sent out. However in some circumstances
  # the client may want to reduce the number of requests even under moderate load.
  # This setting accomplishes this by adding a small amount of artificial delay—that is,
  # rather than immediately sending out a record the producer will wait for up to the given delay
  # to allow other records to be sent so that the sends can be batched together.
  config :linger_ms, :validate => :number, :default => 0
  # The maximum size of a request
  config :max_request_size, :validate => :number, :default => 1048576
  # The size of the TCP receive buffer to use when reading data
  config :receive_buffer_bytes, :validate => :number, :default => 32768
  # The size of the TCP send buffer to use when sending data.
  config :send_buffer_bytes, :validate => :number, :default => 131072
  # The configuration controls the maximum amount of time the server will wait for acknowledgments
  # from followers to meet the acknowledgment requirements the producer has specified with the
  # acks configuration. If the requested number of acknowledgments are not met when the timeout
  # elapses an error will be returned. This timeout is measured on the server side and does not
  # include the network latency of the request.
  config :timeout_ms, :validate => :number, :default => 30000
  # When our memory buffer is exhausted we must either stop accepting new
  # records (block) or throw errors. By default this setting is true and we block,
  # however in some scenarios blocking is not desirable and it is better to immediately give an error.
  config :block_on_buffer_full, :validate => :boolean, :default => true
  # the timeout setting for initial metadata request to fetch topic metadata.
  config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000
  # the max time in milliseconds before a metadata refresh is forced.
  config :metadata_max_age_ms, :validate => :number, :default => 300000
  # The amount of time to wait before attempting to reconnect to a given host when a connection fails.
  config :reconnect_backoff_ms, :validate => :number, :default => 10
  # The amount of time to wait before attempting to retry a failed produce request to a given topic partition.
  config :retry_backoff_ms, :validate => :number, :default => 100

  public
  def register
    LogStash::Logger.setup_log4j(@logger)

    options = {
      :key_serializer => @key_serializer,
      :value_serializer => @value_serializer,
      :bootstrap_servers => @bootstrap_servers,
      :acks => @acks,
      :buffer_memory => @buffer_memory,
      :compression_type => @compression_type,
      :retries => @retries,
      :batch_size => @batch_size,
      :client_id => @client_id,
      :linger_ms => @linger_ms,
      :max_request_size => @max_request_size,
      :receive_buffer_bytes => @receive_buffer_bytes,
      :send_buffer_bytes => @send_buffer_bytes,
      :timeout_ms => @timeout_ms,
      :block_on_buffer_full => @block_on_buffer_full,
      :metadata_fetch_timeout_ms => @metadata_fetch_timeout_ms,
      :metadata_max_age_ms => @metadata_max_age_ms,
      :reconnect_backoff_ms => @reconnect_backoff_ms,
      :retry_backoff_ms => @retry_backoff_ms
    }
    @producer = Kafka::KafkaProducer.new(options)
    @producer.connect

    @logger.info('Registering kafka producer', :topic_id => @topic_id, :bootstrap_servers => @bootstrap_servers)

    @codec.on_event do |event, data|
      begin
        key = if @message_key.nil? then nil else event.sprintf(@message_key) end
        @producer.send_msg(event.sprintf(@topic_id), nil, key, data)
      rescue LogStash::ShutdownSignal
        @logger.info('Kafka producer got shutdown signal')
      rescue => e
        @logger.warn('kafka producer threw exception, restarting',
                     :exception => e)
      end
    end
  end # def register

  def receive(event)
   
    if event == LogStash::SHUTDOWN
      return
    end
    @codec.encode(event)
  end

  def close
    @producer.close
  end
end #class LogStash::Outputs::Kafka

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-233058-1-1.html 上篇帖子: Spring Kafka 1.0.0 M2 发布 下篇帖子: kafka删除topic的方法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表