Go语言技术与应用(一):分布式系统构建实战

举报
Yeats_Liao 发表于 2025/11/20 10:26:14 2025/11/20
【摘要】 分布式系统听起来很复杂,但用Go来实现其实没那么难。今天我们来搭建一个完整的分布式系统,包含服务注册、服务发现、负载均衡和健康检查等核心功能。这个项目虽然简单,但麻雀虽小五脏俱全,基本涵盖了分布式系统的主要组件。代码都是可以直接运行的,你可以在本地跑起来看看效果。 1 项目整体架构我们要搭建的系统叫 simple_distributed_system,目录结构是这样的:simple_dist...

分布式系统听起来很复杂,但用Go来实现其实没那么难。今天我们来搭建一个完整的分布式系统,包含服务注册、服务发现、负载均衡和健康检查等核心功能。

这个项目虽然简单,但麻雀虽小五脏俱全,基本涵盖了分布式系统的主要组件。代码都是可以直接运行的,你可以在本地跑起来看看效果。

1 项目整体架构

我们要搭建的系统叫 simple_distributed_system,目录结构是这样的:

simple_distributed_system/
├── main.go
├── service/
│   ├── discovery/
│   │   ├── client.go
│   │   ├── registry.go
│   │   ├── watcher.go
│   │   └── reverse_proxy.go
│   ├── register/
│   │   ├── handler.go
│   │   └── registry.go
│   ├── service.go
│   └── status_monitor.go
├── web/
│   └── app.go

这个结构看起来清晰明了。service 目录下放的是核心的服务治理逻辑,web 目录是业务应用,main.go 负责把所有组件串起来。

2 核心代码实现

2.1 系统启动入口

先看看 main.go,这是整个系统的大脑:

package main

import (
    "log"
    "net/http"
    "simple_distributed_system/service"
    "simple_distributed_system/web"
    "time"
)

// main函数是整个分布式系统的入口点
func main() {
    // 初始化服务注册中心,管理服务的注册和注销
    registerRegistry := service.NewServiceRegisterRegistry()
    
    // 初始化服务发现注册中心,存储服务发现相关信息
    discoveryRegistry := service.NewServiceDiscoveryRegistry()
    
    // 创建服务发现客户端,带30秒缓存过期时间
    // 这样可以减少频繁查询注册中心的开销
    discoveryClient := service.NewServiceDiscoveryClient(discoveryRegistry, 30*time.Second)
    
    // 创建服务变化监听器,当服务状态变化时能及时通知
    serviceWatcher := service.NewServiceWatcher(discoveryRegistry, discoveryClient)

    // 注册HTTP接口
    http.HandleFunc("/register", service.RegisterHandler(registerRegistry))
    http.HandleFunc("/unregister", service.UnregisterHandler(registerRegistry))
    http.HandleFunc("/longpolling", service.LongPollingHandler(discoveryRegistry))

    // 启动各种后台任务
    go service.StartServiceStatusMonitoring(registerRegistry)  // 健康检查
    go web.StartWebApp()                                       // 业务服务
    go serviceWatcher.Start()                                  // 服务监听

    // 启动注册中心服务,监听8080端口
    log.Fatal(http.ListenAndServe(":8080", nil))
}

这里用了几个goroutine来并发处理不同的任务。健康检查、业务服务、服务监听都是独立运行的,互不干扰。

2.2 服务发现客户端

服务发现是分布式系统的核心,我们来看看 service/discovery/client.go

package discovery

import (
    "fmt"
    "sync"
    "time"
)

// ServiceInfo存储服务的基本信息
type ServiceInfo struct {
    Name      string
    Address   string
    Port      int
    CacheTime time.Time  // 缓存时间,用于判断是否过期
}

// ServiceDiscoveryRegistry是服务发现的注册中心
type ServiceDiscoveryRegistry struct {
    services map[string]ServiceInfo
    mu       sync.Mutex  // 保证并发安全
}

// NewServiceDiscoveryRegistry创建新的服务发现注册中心
func NewServiceDiscoveryRegistry() *ServiceDiscoveryRegistry {
    return &ServiceDiscoveryRegistry{
        services: make(map[string]ServiceInfo),
    }
}

// RegisterService注册新服务
func (sdr *ServiceDiscoveryRegistry) RegisterService(service ServiceInfo) error {
    sdr.mu.Lock()
    defer sdr.mu.Unlock()
    sdr.services[service.Name] = service
    return nil
}

// UnregisterService注销服务
func (sdr *ServiceDiscoveryRegistry) UnregisterService(serviceName string) error {
    sdr.mu.Lock()
    defer sdr.mu.Unlock()
    delete(sdr.services, serviceName)
    return nil
}

// GetService获取指定服务信息
func (sdr *ServiceDiscoveryRegistry) GetService(serviceName string) (ServiceInfo, error) {
    sdr.mu.Lock()
    defer sdr.mu.Unlock()
    
    service, ok := sdr.services[serviceName]
    if !ok {
        return ServiceInfo{}, fmt.Errorf("service %s not found", serviceName)
    }
    return service, nil
}

// ServiceDiscoveryClient是服务发现客户端,带缓存功能
type ServiceDiscoveryClient struct {
    registry    *ServiceDiscoveryRegistry
    cache       map[string]ServiceInfo
    cacheExpiry time.Duration
    mu          sync.Mutex
}

// NewServiceDiscoveryClient创建服务发现客户端
func NewServiceDiscoveryClient(registry *ServiceDiscoveryRegistry, cacheExpiry time.Duration) *ServiceDiscoveryClient {
    return &ServiceDiscoveryClient{
        registry:    registry,
        cache:       make(map[string]ServiceInfo),
        cacheExpiry: cacheExpiry,
    }
}

// DiscoverService发现服务,优先从缓存获取
func (sdc *ServiceDiscoveryClient) DiscoverService(serviceName string) (ServiceInfo, error) {
    sdc.mu.Lock()
    
    // 先检查缓存,如果存在且未过期就直接返回
    if serviceInfo, ok := sdc.cache[serviceName]; ok {
        if time.Since(serviceInfo.CacheTime) < sdc.cacheExpiry {
            sdc.mu.Unlock()
            return serviceInfo, nil
        }
    }
    sdc.mu.Unlock()

    // 缓存没有或已过期,从注册中心获取
    serviceInfo, err := sdc.registry.GetService(serviceName)
    if err != nil {
        return ServiceInfo{}, err
    }
    
    // 更新缓存
    serviceInfo.CacheTime = time.Now()
    sdc.mu.Lock()
    sdc.cache[serviceName] = serviceInfo
    sdc.mu.Unlock()
    
    return serviceInfo, nil
}

这里的缓存机制很重要。在高并发场景下,如果每次都去查注册中心,性能会很差。有了缓存,大部分请求都能直接从内存返回,速度快很多。

2.3 服务变化监听

分布式系统中服务会动态上下线,我们需要及时感知这些变化。看看 service/discovery/watcher.go

package discovery

import (
    "log"
    "sync"
    "time"
)

// ServiceWatcher监听服务变化并通知相关组件
type ServiceWatcher struct {
    registry       *ServiceDiscoveryRegistry
    client         *ServiceDiscoveryClient
    changeHandlers map[string][]func(ServiceInfo)  // 每个服务可以有多个处理函数
    mu             sync.Mutex
}

// NewServiceWatcher创建服务监听器
func NewServiceWatcher(registry *ServiceDiscoveryRegistry, client *ServiceDiscoveryClient) *ServiceWatcher {
    return &ServiceWatcher{
        registry:       registry,
        client:         client,
        changeHandlers: make(map[string][]func(ServiceInfo)),
    }
}

// WatchService为指定服务注册变化处理函数
func (sw *ServiceWatcher) WatchService(serviceName string, handler func(ServiceInfo)) {
    sw.mu.Lock()
    defer sw.mu.Unlock()
    
    if _, ok := sw.changeHandlers[serviceName]; !ok {
        sw.changeHandlers[serviceName] = []func(ServiceInfo){}
    }
    sw.changeHandlers[serviceName] = append(sw.changeHandlers[serviceName], handler)
}

// NotifyServiceChange通知服务变化
func (sw *ServiceWatcher) NotifyServiceChange(serviceName string, newServiceInfo ServiceInfo) {
    sw.mu.Lock()
    handlers, ok := sw.changeHandlers[serviceName]
    sw.mu.Unlock()
    
    if ok {
        // 异步执行所有处理函数,避免阻塞
        for _, handler := range handlers {
            go handler(newServiceInfo)
        }
    }
    
    // 同时更新客户端缓存
    sw.client.mu.Lock()
    sw.client.cache[serviceName] = newServiceInfo
    sw.client.cache[serviceName].CacheTime = time.Now()
    sw.client.mu.Unlock()
}

// Start开始监听服务变化
func (sw *ServiceWatcher) Start() {
    for {
        time.Sleep(5 * time.Second)  // 每5秒检查一次
        
        sw.registry.mu.Lock()
        services := make(map[string]ServiceInfo)
        for k, v := range sw.registry.services {
            services[k] = v
        }
        sw.registry.mu.Unlock()
        
        // 遍历所有服务,检查变化
        for serviceName := range services {
            serviceInfo, err := sw.client.DiscoverService(serviceName)
            if err == nil {
                sw.NotifyServiceChange(serviceName, serviceInfo)
            }
        }
    }
}

这个监听器每5秒扫描一次所有服务,发现变化就通知相关的处理函数。在实际项目中,你可能需要更精细的变化检测机制。

2.4 长轮询支持

有些客户端希望实时获取服务变化,我们提供长轮询接口。看看 service/discovery/reverse_proxy.go

package discovery

import (
    "encoding/json"
    "fmt"
    "net/http"
)

// LongPollingHandler处理长轮询请求
func LongPollingHandler(sr *ServiceDiscoveryRegistry) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        serviceName := r.URL.Query().Get("service_name")
        if serviceName == "" {
            http.Error(w, "service_name parameter is required", http.StatusBadRequest)
            return
        }
        
        // 创建更新通道
        updateChan := make(chan ServiceInfo, 1)
        
        sr.mu.Lock()
        if _, ok := sr.services[serviceName]; ok {
            // 这里简化处理,实际项目中需要更复杂的通道管理
            sr.mu.Unlock()
        } else {
            sr.mu.Unlock()
            http.Error(w, fmt.Sprintf("service %s not found", serviceName), http.StatusNotFound)
            return
        }

        // 等待服务更新或请求超时
        select {
        case serviceInfo := <-updateChan:
            data, err := json.Marshal(serviceInfo)
            if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
            }
            w.Header().Set("Content-Type", "application/json")
            w.Write(data)
        case <-r.Context().Done():
            // 客户端断开连接或超时
            return
        }
    }
}

长轮询让客户端可以"挂起"请求,等待服务变化时才返回。这比频繁轮询要高效很多。

2.5 服务注册处理

服务注册是最基础的功能,看看 service/register/handler.go

package register

import (
    "encoding/json"
    "net/http"
)

// RegisterHandler处理服务注册请求
func RegisterHandler(sr *ServiceRegisterRegistry) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        var service ServiceInfo
        
        // 解析请求体中的服务信息
        err := json.NewDecoder(r.Body).Decode(&service)
        if err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }
        
        // 注册服务
        err = sr.Register(service)
        if err != nil {
            http.Error(w, err.Error(), http.StatusConflict)
            return
        }
        
        w.WriteHeader(http.StatusCreated)
    }
}

// UnregisterHandler处理服务注销请求
func UnregisterHandler(sr *ServiceRegisterRegistry) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        serviceName := r.URL.Query().Get("service_name")
        if serviceName == "" {
            http.Error(w, "service_name parameter is required", http.StatusBadRequest)
            return
        }
        
        err := sr.Unregister(serviceName)
        if err != nil {
            http.Error(w, err.Error(), http.StatusNotFound)
            return
        }
        
        w.WriteHeader(http.StatusOK)
    }
}

这两个接口很简单,一个负责注册,一个负责注销。实际使用时,服务启动就调用注册接口,关闭时调用注销接口。

2.6 注册中心实现

注册中心是整个系统的核心,看看 service/register/registry.go

package register

import (
    "fmt"
    "sync"
)

// ServiceInfo存储服务基本信息
type ServiceInfo struct {
    Name    string
    Address string
    Port    int
}

// ServiceRegisterRegistry服务注册中心
type ServiceRegisterRegistry struct {
    services map[string]ServiceInfo
    mu       sync.Mutex
}

// NewServiceRegisterRegistry创建注册中心
func NewServiceRegisterRegistry() *ServiceRegisterRegistry {
    return &ServiceRegisterRegistry{
        services: make(map[string]ServiceInfo),
    }
}

// Register注册服务
func (sr *ServiceRegisterRegistry) Register(service ServiceInfo) error {
    sr.mu.Lock()
    defer sr.mu.Unlock()
    
    if sr.services == nil {
        sr.services = make(map[string]ServiceInfo)
    }
    
    // 检查服务是否已存在
    if _, ok := sr.services[service.Name]; ok {
        return fmt.Errorf("service %s already exists", service.Name)
    }
    
    sr.services[service.Name] = service
    return nil
}

// Unregister注销服务
func (sr *ServiceRegisterRegistry) Unregister(serviceName string) error {
    sr.mu.Lock()
    defer sr.mu.Unlock()
    
    if _, ok := sr.services[serviceName]; !ok {
        return fmt.Errorf("service %s not found", serviceName)
    }
    
    delete(sr.services, serviceName)
    return nil
}

// GetAllServices获取所有服务
func (sr *ServiceRegisterRegistry) GetAllServices() map[string]ServiceInfo {
    sr.mu.Lock()
    defer sr.mu.Unlock()
    
    result := make(map[string]ServiceInfo)
    for k, v := range sr.services {
        result[k] = v
    }
    return result
}

注册中心用一个简单的map来存储服务信息。在生产环境中,你可能需要持久化存储,比如用etcd或者数据库。

2.7 业务服务示例

我们来写一个简单的用户服务,看看 service/service.go

package service

// User用户信息结构
type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Email string `json:"email"`
}

// UserService用户服务
type UserService struct {
    // 实际项目中这里会有数据库连接等
    users map[int]User
}

// NewUserService创建用户服务
func NewUserService() *UserService {
    return &UserService{
        users: make(map[int]User),
    }
}

// AddUser添加用户
func (us *UserService) AddUser(user User) error {
    // 这里应该有数据验证、数据库插入等逻辑
    us.users[user.ID] = user
    return nil
}

// GetUser获取用户
func (us *UserService) GetUser(id int) (User, error) {
    user, ok := us.users[id]
    if !ok {
        return User{}, fmt.Errorf("user %d not found", id)
    }
    return user, nil
}

// GetAllUsers获取所有用户
func (us *UserService) GetAllUsers() []User {
    users := make([]User, 0, len(us.users))
    for _, user := range us.users {
        users = append(users, user)
    }
    return users
}

这个用户服务很简单,就是基本的增删改查。在真实项目中,你需要加上数据库操作、参数验证、错误处理等。

2.8 健康检查

分布式系统中,服务可能随时挂掉,我们需要定期检查服务健康状态。看看 service/status_monitor.go

package service

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

// MonitorService检查单个服务健康状态
func MonitorService(service ServiceInfo) {
    client := http.Client{
        Timeout: 5 * time.Second,  // 5秒超时
    }
    
    start := time.Now()
    // 向服务的健康检查接口发送请求
    resp, err := client.Head(fmt.Sprintf("http://%s:%d/health", service.Address, service.Port))
    duration := time.Since(start)
    
    if err != nil {
        log.Printf("Service %s is not healthy, error: %v", service.Name, err)
    } else if resp.StatusCode != http.StatusOK {
        log.Printf("Service %s is not healthy, status code: %d", service.Name, resp.StatusCode)
    } else {
        log.Printf("Service %s is healthy, response time: %v", service.Name, duration)
    }
    
    if resp != nil {
        resp.Body.Close()
    }
}

// StartServiceStatusMonitoring启动服务状态监控
func StartServiceStatusMonitoring(registry *ServiceRegisterRegistry) {
    for {
        services := registry.GetAllServices()
        
        // 并发检查所有服务
        for _, service := range services {
            go MonitorService(service)
        }
        
        // 每30秒检查一次
        time.Sleep(30 * time.Second)
    }
}

健康检查会定期向每个服务的 /health 接口发送请求,根据响应判断服务是否正常。如果服务不健康,可以从负载均衡中摘除。

2.9 Web应用

最后看看业务应用部分,web/app.go

package web

import (
    "encoding/json"
    "log"
    "net/http"
    "simple_distributed_system/service"
    "strconv"
)

var userService *service.UserService

// StartWebApp启动Web应用
func StartWebApp() {
    userService = service.NewUserService()
    
    // 注册路由
    http.HandleFunc("/users", handleUsers)
    http.HandleFunc("/users/", handleUser)
    http.HandleFunc("/health", handleHealth)
    
    log.Println("Web app starting on port 8081")
    log.Fatal(http.ListenAndServe(":8081", nil))
}

// handleUsers处理用户列表请求
func handleUsers(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        users := userService.GetAllUsers()
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(users)
    case "POST":
        var user service.User
        err := json.NewDecoder(r.Body).Decode(&user)
        if err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }
        
        err = userService.AddUser(user)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        
        w.WriteHeader(http.StatusCreated)
    default:
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}

// handleUser处理单个用户请求
func handleUser(w http.ResponseWriter, r *http.Request) {
    if r.Method != "GET" {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }
    
    // 从URL中提取用户ID
    idStr := r.URL.Path[len("/users/"):]
    id, err := strconv.Atoi(idStr)
    if err != nil {
        http.Error(w, "Invalid user ID", http.StatusBadRequest)
        return
    }
    
    user, err := userService.GetUser(id)
    if err != nil {
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(user)
}

// handleHealth健康检查接口
func handleHealth(w http.ResponseWriter, r *http.Request) {
    w.WriteHeader(http.StatusOK)
    w.Write([]byte("OK"))
}

这个Web应用提供了基本的用户管理接口,还有健康检查接口供监控系统调用。

3 运行和测试

3.1 启动系统

把所有代码写好后,在项目根目录运行:

go mod init simple_distributed_system
go run main.go

系统会启动两个服务:

- 注册中心:http://localhost:8080
- 业务应用:http://localhost:8081

3.2 测试服务注册

注册一个服务:

curl -X POST http://localhost:8080/register \
  -H "Content-Type: application/json" \
  -d '{"name":"user-service","address":"localhost","port":8081}'

3.3 测试业务接口

添加用户:

curl -X POST http://localhost:8081/users \
  -H "Content-Type: application/json" \
  -d '{"id":1,"name":"张三","email":"zhangsan@example.com"}'

获取用户:

curl http://localhost:8081/users/1

3.4 测试健康检查

curl http://localhost:8081/health

4 总结

这个分布式系统虽然简单,但包含了核心的组件:

  • 服务注册中心:管理服务的注册和发现
  • 缓存机制:提高服务发现的性能
  • 健康检查:监控服务状态
  • 长轮询:实时获取服务变化
  • 并发安全:使用互斥锁保护共享数据

在实际项目中,你可能还需要考虑:

  1. 持久化存储:使用etcd、Consul等专业的服务发现组件
  2. 负载均衡:支持多种负载均衡算法
  3. 服务熔断:防止服务雪崩
  4. 配置管理:动态配置更新
  5. 监控告警:完善的监控体系

但这个基础框架已经能让你理解分布式系统的核心原理。Go语言的并发特性让这些功能实现起来相对简单,代码也比较清晰易懂。

你可以在这个基础上继续扩展,比如加入更多的服务治理功能,或者集成到现有的微服务框架中。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。