Go语言技术与应用(一):分布式系统构建实战
分布式系统听起来很复杂,但用Go来实现其实没那么难。今天我们来搭建一个完整的分布式系统,包含服务注册、服务发现、负载均衡和健康检查等核心功能。
这个项目虽然简单,但麻雀虽小五脏俱全,基本涵盖了分布式系统的主要组件。代码都是可以直接运行的,你可以在本地跑起来看看效果。
1 项目整体架构
我们要搭建的系统叫 simple_distributed_system,目录结构是这样的:
simple_distributed_system/
├── main.go
├── service/
│ ├── discovery/
│ │ ├── client.go
│ │ ├── registry.go
│ │ ├── watcher.go
│ │ └── reverse_proxy.go
│ ├── register/
│ │ ├── handler.go
│ │ └── registry.go
│ ├── service.go
│ └── status_monitor.go
├── web/
│ └── app.go
这个结构看起来清晰明了。service 目录下放的是核心的服务治理逻辑,web 目录是业务应用,main.go 负责把所有组件串起来。
2 核心代码实现
2.1 系统启动入口
先看看 main.go,这是整个系统的大脑:
package main
import (
"log"
"net/http"
"simple_distributed_system/service"
"simple_distributed_system/web"
"time"
)
// main函数是整个分布式系统的入口点
func main() {
// 初始化服务注册中心,管理服务的注册和注销
registerRegistry := service.NewServiceRegisterRegistry()
// 初始化服务发现注册中心,存储服务发现相关信息
discoveryRegistry := service.NewServiceDiscoveryRegistry()
// 创建服务发现客户端,带30秒缓存过期时间
// 这样可以减少频繁查询注册中心的开销
discoveryClient := service.NewServiceDiscoveryClient(discoveryRegistry, 30*time.Second)
// 创建服务变化监听器,当服务状态变化时能及时通知
serviceWatcher := service.NewServiceWatcher(discoveryRegistry, discoveryClient)
// 注册HTTP接口
http.HandleFunc("/register", service.RegisterHandler(registerRegistry))
http.HandleFunc("/unregister", service.UnregisterHandler(registerRegistry))
http.HandleFunc("/longpolling", service.LongPollingHandler(discoveryRegistry))
// 启动各种后台任务
go service.StartServiceStatusMonitoring(registerRegistry) // 健康检查
go web.StartWebApp() // 业务服务
go serviceWatcher.Start() // 服务监听
// 启动注册中心服务,监听8080端口
log.Fatal(http.ListenAndServe(":8080", nil))
}
这里用了几个goroutine来并发处理不同的任务。健康检查、业务服务、服务监听都是独立运行的,互不干扰。
2.2 服务发现客户端
服务发现是分布式系统的核心,我们来看看 service/discovery/client.go:
package discovery
import (
"fmt"
"sync"
"time"
)
// ServiceInfo存储服务的基本信息
type ServiceInfo struct {
Name string
Address string
Port int
CacheTime time.Time // 缓存时间,用于判断是否过期
}
// ServiceDiscoveryRegistry是服务发现的注册中心
type ServiceDiscoveryRegistry struct {
services map[string]ServiceInfo
mu sync.Mutex // 保证并发安全
}
// NewServiceDiscoveryRegistry创建新的服务发现注册中心
func NewServiceDiscoveryRegistry() *ServiceDiscoveryRegistry {
return &ServiceDiscoveryRegistry{
services: make(map[string]ServiceInfo),
}
}
// RegisterService注册新服务
func (sdr *ServiceDiscoveryRegistry) RegisterService(service ServiceInfo) error {
sdr.mu.Lock()
defer sdr.mu.Unlock()
sdr.services[service.Name] = service
return nil
}
// UnregisterService注销服务
func (sdr *ServiceDiscoveryRegistry) UnregisterService(serviceName string) error {
sdr.mu.Lock()
defer sdr.mu.Unlock()
delete(sdr.services, serviceName)
return nil
}
// GetService获取指定服务信息
func (sdr *ServiceDiscoveryRegistry) GetService(serviceName string) (ServiceInfo, error) {
sdr.mu.Lock()
defer sdr.mu.Unlock()
service, ok := sdr.services[serviceName]
if !ok {
return ServiceInfo{}, fmt.Errorf("service %s not found", serviceName)
}
return service, nil
}
// ServiceDiscoveryClient是服务发现客户端,带缓存功能
type ServiceDiscoveryClient struct {
registry *ServiceDiscoveryRegistry
cache map[string]ServiceInfo
cacheExpiry time.Duration
mu sync.Mutex
}
// NewServiceDiscoveryClient创建服务发现客户端
func NewServiceDiscoveryClient(registry *ServiceDiscoveryRegistry, cacheExpiry time.Duration) *ServiceDiscoveryClient {
return &ServiceDiscoveryClient{
registry: registry,
cache: make(map[string]ServiceInfo),
cacheExpiry: cacheExpiry,
}
}
// DiscoverService发现服务,优先从缓存获取
func (sdc *ServiceDiscoveryClient) DiscoverService(serviceName string) (ServiceInfo, error) {
sdc.mu.Lock()
// 先检查缓存,如果存在且未过期就直接返回
if serviceInfo, ok := sdc.cache[serviceName]; ok {
if time.Since(serviceInfo.CacheTime) < sdc.cacheExpiry {
sdc.mu.Unlock()
return serviceInfo, nil
}
}
sdc.mu.Unlock()
// 缓存没有或已过期,从注册中心获取
serviceInfo, err := sdc.registry.GetService(serviceName)
if err != nil {
return ServiceInfo{}, err
}
// 更新缓存
serviceInfo.CacheTime = time.Now()
sdc.mu.Lock()
sdc.cache[serviceName] = serviceInfo
sdc.mu.Unlock()
return serviceInfo, nil
}
这里的缓存机制很重要。在高并发场景下,如果每次都去查注册中心,性能会很差。有了缓存,大部分请求都能直接从内存返回,速度快很多。
2.3 服务变化监听
分布式系统中服务会动态上下线,我们需要及时感知这些变化。看看 service/discovery/watcher.go:
package discovery
import (
"log"
"sync"
"time"
)
// ServiceWatcher监听服务变化并通知相关组件
type ServiceWatcher struct {
registry *ServiceDiscoveryRegistry
client *ServiceDiscoveryClient
changeHandlers map[string][]func(ServiceInfo) // 每个服务可以有多个处理函数
mu sync.Mutex
}
// NewServiceWatcher创建服务监听器
func NewServiceWatcher(registry *ServiceDiscoveryRegistry, client *ServiceDiscoveryClient) *ServiceWatcher {
return &ServiceWatcher{
registry: registry,
client: client,
changeHandlers: make(map[string][]func(ServiceInfo)),
}
}
// WatchService为指定服务注册变化处理函数
func (sw *ServiceWatcher) WatchService(serviceName string, handler func(ServiceInfo)) {
sw.mu.Lock()
defer sw.mu.Unlock()
if _, ok := sw.changeHandlers[serviceName]; !ok {
sw.changeHandlers[serviceName] = []func(ServiceInfo){}
}
sw.changeHandlers[serviceName] = append(sw.changeHandlers[serviceName], handler)
}
// NotifyServiceChange通知服务变化
func (sw *ServiceWatcher) NotifyServiceChange(serviceName string, newServiceInfo ServiceInfo) {
sw.mu.Lock()
handlers, ok := sw.changeHandlers[serviceName]
sw.mu.Unlock()
if ok {
// 异步执行所有处理函数,避免阻塞
for _, handler := range handlers {
go handler(newServiceInfo)
}
}
// 同时更新客户端缓存
sw.client.mu.Lock()
sw.client.cache[serviceName] = newServiceInfo
sw.client.cache[serviceName].CacheTime = time.Now()
sw.client.mu.Unlock()
}
// Start开始监听服务变化
func (sw *ServiceWatcher) Start() {
for {
time.Sleep(5 * time.Second) // 每5秒检查一次
sw.registry.mu.Lock()
services := make(map[string]ServiceInfo)
for k, v := range sw.registry.services {
services[k] = v
}
sw.registry.mu.Unlock()
// 遍历所有服务,检查变化
for serviceName := range services {
serviceInfo, err := sw.client.DiscoverService(serviceName)
if err == nil {
sw.NotifyServiceChange(serviceName, serviceInfo)
}
}
}
}
这个监听器每5秒扫描一次所有服务,发现变化就通知相关的处理函数。在实际项目中,你可能需要更精细的变化检测机制。
2.4 长轮询支持
有些客户端希望实时获取服务变化,我们提供长轮询接口。看看 service/discovery/reverse_proxy.go:
package discovery
import (
"encoding/json"
"fmt"
"net/http"
)
// LongPollingHandler处理长轮询请求
func LongPollingHandler(sr *ServiceDiscoveryRegistry) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
serviceName := r.URL.Query().Get("service_name")
if serviceName == "" {
http.Error(w, "service_name parameter is required", http.StatusBadRequest)
return
}
// 创建更新通道
updateChan := make(chan ServiceInfo, 1)
sr.mu.Lock()
if _, ok := sr.services[serviceName]; ok {
// 这里简化处理,实际项目中需要更复杂的通道管理
sr.mu.Unlock()
} else {
sr.mu.Unlock()
http.Error(w, fmt.Sprintf("service %s not found", serviceName), http.StatusNotFound)
return
}
// 等待服务更新或请求超时
select {
case serviceInfo := <-updateChan:
data, err := json.Marshal(serviceInfo)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(data)
case <-r.Context().Done():
// 客户端断开连接或超时
return
}
}
}
长轮询让客户端可以"挂起"请求,等待服务变化时才返回。这比频繁轮询要高效很多。
2.5 服务注册处理
服务注册是最基础的功能,看看 service/register/handler.go:
package register
import (
"encoding/json"
"net/http"
)
// RegisterHandler处理服务注册请求
func RegisterHandler(sr *ServiceRegisterRegistry) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var service ServiceInfo
// 解析请求体中的服务信息
err := json.NewDecoder(r.Body).Decode(&service)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 注册服务
err = sr.Register(service)
if err != nil {
http.Error(w, err.Error(), http.StatusConflict)
return
}
w.WriteHeader(http.StatusCreated)
}
}
// UnregisterHandler处理服务注销请求
func UnregisterHandler(sr *ServiceRegisterRegistry) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
serviceName := r.URL.Query().Get("service_name")
if serviceName == "" {
http.Error(w, "service_name parameter is required", http.StatusBadRequest)
return
}
err := sr.Unregister(serviceName)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
}
}
这两个接口很简单,一个负责注册,一个负责注销。实际使用时,服务启动就调用注册接口,关闭时调用注销接口。
2.6 注册中心实现
注册中心是整个系统的核心,看看 service/register/registry.go:
package register
import (
"fmt"
"sync"
)
// ServiceInfo存储服务基本信息
type ServiceInfo struct {
Name string
Address string
Port int
}
// ServiceRegisterRegistry服务注册中心
type ServiceRegisterRegistry struct {
services map[string]ServiceInfo
mu sync.Mutex
}
// NewServiceRegisterRegistry创建注册中心
func NewServiceRegisterRegistry() *ServiceRegisterRegistry {
return &ServiceRegisterRegistry{
services: make(map[string]ServiceInfo),
}
}
// Register注册服务
func (sr *ServiceRegisterRegistry) Register(service ServiceInfo) error {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.services == nil {
sr.services = make(map[string]ServiceInfo)
}
// 检查服务是否已存在
if _, ok := sr.services[service.Name]; ok {
return fmt.Errorf("service %s already exists", service.Name)
}
sr.services[service.Name] = service
return nil
}
// Unregister注销服务
func (sr *ServiceRegisterRegistry) Unregister(serviceName string) error {
sr.mu.Lock()
defer sr.mu.Unlock()
if _, ok := sr.services[serviceName]; !ok {
return fmt.Errorf("service %s not found", serviceName)
}
delete(sr.services, serviceName)
return nil
}
// GetAllServices获取所有服务
func (sr *ServiceRegisterRegistry) GetAllServices() map[string]ServiceInfo {
sr.mu.Lock()
defer sr.mu.Unlock()
result := make(map[string]ServiceInfo)
for k, v := range sr.services {
result[k] = v
}
return result
}
注册中心用一个简单的map来存储服务信息。在生产环境中,你可能需要持久化存储,比如用etcd或者数据库。
2.7 业务服务示例
我们来写一个简单的用户服务,看看 service/service.go:
package service
// User用户信息结构
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
// UserService用户服务
type UserService struct {
// 实际项目中这里会有数据库连接等
users map[int]User
}
// NewUserService创建用户服务
func NewUserService() *UserService {
return &UserService{
users: make(map[int]User),
}
}
// AddUser添加用户
func (us *UserService) AddUser(user User) error {
// 这里应该有数据验证、数据库插入等逻辑
us.users[user.ID] = user
return nil
}
// GetUser获取用户
func (us *UserService) GetUser(id int) (User, error) {
user, ok := us.users[id]
if !ok {
return User{}, fmt.Errorf("user %d not found", id)
}
return user, nil
}
// GetAllUsers获取所有用户
func (us *UserService) GetAllUsers() []User {
users := make([]User, 0, len(us.users))
for _, user := range us.users {
users = append(users, user)
}
return users
}
这个用户服务很简单,就是基本的增删改查。在真实项目中,你需要加上数据库操作、参数验证、错误处理等。
2.8 健康检查
分布式系统中,服务可能随时挂掉,我们需要定期检查服务健康状态。看看 service/status_monitor.go:
package service
import (
"fmt"
"log"
"net/http"
"time"
)
// MonitorService检查单个服务健康状态
func MonitorService(service ServiceInfo) {
client := http.Client{
Timeout: 5 * time.Second, // 5秒超时
}
start := time.Now()
// 向服务的健康检查接口发送请求
resp, err := client.Head(fmt.Sprintf("http://%s:%d/health", service.Address, service.Port))
duration := time.Since(start)
if err != nil {
log.Printf("Service %s is not healthy, error: %v", service.Name, err)
} else if resp.StatusCode != http.StatusOK {
log.Printf("Service %s is not healthy, status code: %d", service.Name, resp.StatusCode)
} else {
log.Printf("Service %s is healthy, response time: %v", service.Name, duration)
}
if resp != nil {
resp.Body.Close()
}
}
// StartServiceStatusMonitoring启动服务状态监控
func StartServiceStatusMonitoring(registry *ServiceRegisterRegistry) {
for {
services := registry.GetAllServices()
// 并发检查所有服务
for _, service := range services {
go MonitorService(service)
}
// 每30秒检查一次
time.Sleep(30 * time.Second)
}
}
健康检查会定期向每个服务的 /health 接口发送请求,根据响应判断服务是否正常。如果服务不健康,可以从负载均衡中摘除。
2.9 Web应用
最后看看业务应用部分,web/app.go:
package web
import (
"encoding/json"
"log"
"net/http"
"simple_distributed_system/service"
"strconv"
)
var userService *service.UserService
// StartWebApp启动Web应用
func StartWebApp() {
userService = service.NewUserService()
// 注册路由
http.HandleFunc("/users", handleUsers)
http.HandleFunc("/users/", handleUser)
http.HandleFunc("/health", handleHealth)
log.Println("Web app starting on port 8081")
log.Fatal(http.ListenAndServe(":8081", nil))
}
// handleUsers处理用户列表请求
func handleUsers(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
users := userService.GetAllUsers()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(users)
case "POST":
var user service.User
err := json.NewDecoder(r.Body).Decode(&user)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = userService.AddUser(user)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// handleUser处理单个用户请求
func handleUser(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 从URL中提取用户ID
idStr := r.URL.Path[len("/users/"):]
id, err := strconv.Atoi(idStr)
if err != nil {
http.Error(w, "Invalid user ID", http.StatusBadRequest)
return
}
user, err := userService.GetUser(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(user)
}
// handleHealth健康检查接口
func handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
这个Web应用提供了基本的用户管理接口,还有健康检查接口供监控系统调用。
3 运行和测试
3.1 启动系统
把所有代码写好后,在项目根目录运行:
go mod init simple_distributed_system
go run main.go
系统会启动两个服务:
- 注册中心:http://localhost:8080
- 业务应用:http://localhost:8081
3.2 测试服务注册
注册一个服务:
curl -X POST http://localhost:8080/register \
-H "Content-Type: application/json" \
-d '{"name":"user-service","address":"localhost","port":8081}'
3.3 测试业务接口
添加用户:
curl -X POST http://localhost:8081/users \
-H "Content-Type: application/json" \
-d '{"id":1,"name":"张三","email":"zhangsan@example.com"}'
获取用户:
curl http://localhost:8081/users/1
3.4 测试健康检查
curl http://localhost:8081/health
4 总结
这个分布式系统虽然简单,但包含了核心的组件:
- 服务注册中心:管理服务的注册和发现
- 缓存机制:提高服务发现的性能
- 健康检查:监控服务状态
- 长轮询:实时获取服务变化
- 并发安全:使用互斥锁保护共享数据
在实际项目中,你可能还需要考虑:
- 持久化存储:使用etcd、Consul等专业的服务发现组件
- 负载均衡:支持多种负载均衡算法
- 服务熔断:防止服务雪崩
- 配置管理:动态配置更新
- 监控告警:完善的监控体系
但这个基础框架已经能让你理解分布式系统的核心原理。Go语言的并发特性让这些功能实现起来相对简单,代码也比较清晰易懂。
你可以在这个基础上继续扩展,比如加入更多的服务治理功能,或者集成到现有的微服务框架中。
- 点赞
- 收藏
- 关注作者
评论(0)