开发一个filebeat output websocket插件

举报
张俭 发表于 2023/12/11 10:35:22 2023/12/11
【摘要】 开发一个filebeat的websocket插件, 代码仓地址: https://github.com/shoothzj/beats_output_websocket 引入对beat的依赖go get github.com/elastic/beats/v7 定义在filebeat中的配置文件filebeat通常以配置文件的方式加载插件。让我们定义一下必须的配置,就像elasticsearch...

开发一个filebeat的websocket插件, 代码仓地址: https://github.com/shoothzj/beats_output_websocket

引入对beat的依赖

go get github.com/elastic/beats/v7

定义在filebeat中的配置文件

filebeat通常以配置文件的方式加载插件。让我们定义一下必须的配置,就像elasticsearch中的连接地址等等一样。

output.websocket:
  # worker
  # 用于工作的websocket客户端数量
  workers: 1
  # 日志批量的最大大小
  batch_size: 1
  # 重试的最大次数,0代表不重试
  retry_limit: 1
  # conn
  # ws/wss
  schema: "ws"
  # websocket连接地址
  addr: "localhost:8080"
  # websocket路径
  path: "/echo"
  # websocket心跳间隔,用于保活
  ping_interval: 30

go文件中的配置

type clientConfig struct {
	// Number of worker goroutines publishing log events
	Workers int `config:"workers" validate:"min=1"`
	// Max number of events in a batch to send to a single client
	BatchSize int `config:"batch_size" validate:"min=1"`
	// Max number of retries for single batch of events
	RetryLimit int `config:"retry_limit"`
	// Schema WebSocket Schema
	Schema string `config:"schema"`
	// Addr WebSocket Addr
	Addr string `config:"addr"`
	// Path WebSocket Path
	Path string `config:"path"`
	// PingInterval WebSocket PingInterval
	PingInterval int `config:"ping_interval"`
}

初始化加载插件

加载插件

在某个init函数中注册插件

func init() {
	outputs.RegisterType("websocket", newWsOutput)
}

newWsOutput中卸载配置,并提供配置给WebSocket客户端

func newWsOutput(_ outputs.IndexManager, _ beat.Info, stats outputs.Observer, cfg *common.Config) (outputs.Group, error) {
	config := clientConfig{}
	// 卸载配置,将配置用于初始化WebSocket客户端
	if err := cfg.Unpack(&config); err != nil {
		return outputs.Fail(err)
	}
	clients := make([]outputs.NetworkClient, config.Workers)
	for i := 0; i < config.Workers; i++ {
		clients[i] = &wsClient{
			stats:  stats,
			Schema: config.Schema,
			Host:   config.Addr,
			Path:   config.Path,
			PingInterval: config.PingInterval,
		}
	}

	return outputs.SuccessNet(true, config.BatchSize, config.RetryLimit, clients)
}

初始化WebSocket客户端

WebSocket客户端不仅仅是一个WebSocket客户端,而且还需要实现filebeat中的NetworkClient接口,接下来,让我们来关注接口中的每一个方法的作用及实现

String()接口

String作为客户端的名字,用来标识日志以及指标。是最简单的一个接口

func (w *wsClient) String() string {
	return "websocket"
}

Connect()接口

Connect用来初始化客户端

func (w *wsClient) Connect() error {
	u := url.URL{Scheme: w.Schema, Host: w.Host, Path: w.Path}
	dial, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
	if err == nil {
		w.conn = dial
		ticker := time.NewTicker(time.Duration(w.PingInterval) * time.Second)
		go func() {
			for range ticker.C {
				w.conn.WriteMessage(websocket.PingMessage, nil)
			}
		}()
	} else {
		time.Sleep(10 * time.Second)
	}
	return err
}

注意,这里初始化失败,需要Sleep一段时间,否则,filebeat会一直重试。这绝非是你想要的。或许对于场景来说,退避重试可能会更好

Close()接口

关闭客户端,也是很简单的接口

func (w *wsClient) Close() error {
	return w.conn.Close()
}

Publish()接口

func (w *wsClient) Publish(_ context.Context, batch publisher.Batch) error {
	events := batch.Events()
	// 记录这批日志
	w.stats.NewBatch(len(events))
	failEvents, err := w.PublishEvents(events)
	if err != nil {
		// 如果发送正常,则ACK
		batch.ACK()
	} else {
		// 发送失败,则重试。受RetryLimit的限制
		batch.RetryEvents(failEvents)
	}
	return err
}

func (w *wsClient) PublishEvents(events []publisher.Event) ([]publisher.Event, error) {
	for i, event := range events {
		err := w.publishEvent(&event)
		if err != nil {
			// 如果单条消息发送失败,则将剩余的消息直接重试
			return events[i:], err
		}
	}
	return nil, nil
}

func (w *wsClient) publishEvent(event *publisher.Event) error {
	bytes, err := encode(&event.Content)
	if err != nil {
		// 如果编码失败,就不重试了,重试也不会成功
		// encode error, don't retry.
		// consider being success
		return nil
	}
	err = w.conn.WriteMessage(websocket.TextMessage, bytes)
	if err != nil {
		// 写入WebSocket Server失败
		return err
	}
	return nil
}

编码

编码的逻辑因人而异,事实上,这可能是大家最大的差异所在。这里只是做一个简单地例子

type LogOutput struct {
	Timestamp time.Time `json:"timestamp"`
	Message   string    `json:"message"`
}

func encode(event *beat.Event) ([]byte, error) {
	logOutput := &LogOutput{}
	value, err := event.Fields.GetValue("message")
	if err != nil {
		return nil, err
	}
	logOutput.Timestamp = event.Timestamp
	logOutput.Message = value.(string)
	return json.Marshal(logOutput)
}

最后是我们的wsclient

type wsClient struct {
	// construct field
	Schema       string
	Host         string
	Path         string
	PingInterval int

	stats outputs.Observer
	conn  *websocket.Conn
}

添加额外的功能:大包丢弃

你可能会想保护你的WebSocket服务器,避免接收到超级大的日志。我们可以在配置项中添加一个配置

maxLen用来限制日志长度,超过maxLen的日志直接丢弃。为什么不使用filebeat中的max_bytes

因为filebeatmax_bytes的默认行为是截断,截断的日志在某些场景下不如丢弃。(比如,日志是json格式,截断后格式无法解析)

配置中添加maxLen

  max_len: 1024

省略掉那些重复的添加结构体,读取max_len在encode的时候忽略掉

	s := value.(string)
	if len(s) >= w.MaxLen {
		return nil, err
	}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。