使用 Golang 构建实时通知服务
单一责任
一个简单的通知系统有哪些部分?我会说三个。
- 客户端和服务器之间的通信;可以是 WebSocket、HTTP 或 gRPC。
- 存储;将通知存储在某个地方,以便我们可以在用户上网时访问它们。
- The s信号;允许客户端实时接收通知。
这些是一些必不可少的部分,您可以以某种方式设计您的软件。但是,我们将在本文中遵循此设计。这些部分中的每一个都应该是一个单独的包,因为它们在系统中具有不同的职责。尽管您可以将所有这些放在一个包中,并且您的服务可以正常工作且快速,但责任分离与软件的性能无关;这关乎我们作为开发人员的表现,以尽可能快速、轻松地更改和扩展我们的系统。这种分离与依赖注入相结合,使我们能够尽可能快地测试和开发系统的每个部分,也使团队合作和并行工作更容易。
ِDependency Injection ِ依赖注入
这些部分如何相辅相成?他们用接口来做到这一点。假设我们想要一个使用存储和信号包来构建业务逻辑的服务。它不会通过其类(或 Golang 中的结构)名称接收这些包;相反,它通过它们的接口接收它们。为什么?该服务依赖于接口的实现,而不是完全实现的结构。它如何有利于开发和测试过程?
正如我所提到的,我们不是依赖于完全实现的包(或完全实现的服务),而是依赖于接口的实现。这允许我们在运行测试时模拟接口。假设您正在处理依赖于其他服务才能工作的服务。您可以模拟该服务终结点的响应,而不是在计算机上运行全新的应用程序及其所有依赖项,如数据库、Redis 等。此外,您正在分离开发人员的关注点。他们将专注于最重要的事情,他们正在处理的部分,而不是整个应用程序。因此,您的团队成员可以同时处理应用程序的不同部分。尽管这些部分最终相互依赖,但不会影响开发和测试过程。
让我们编写一些简单的测试。
Golang 为我们提供了一个简单而强大的单元测试结构。除了实现之外,您还可以编写测试并保持一切井井有条。想象一下,我们想要创建并测试一个序列化包,该包从数据库接收一些行(最好将其称为存储库),并将给定的行转换为可序列化的结构。其他人正在处理存储库,因此我们无权访问实现。相反,我们讨论了一个满足我们需求的界面。
package repository
import (
"context"
"errors"
)
var (
ErrNotFound = errors.New("article not found")
)
type Article struct {
ID uint64
Title string
Content string
}
type ArticleRepository interface {
ByID(ctx context.Context, id int) (Article, error)
}
我们可以将错误和实体的定义(如 Article)存储在一个单独的包中。但是,在本教程中,我将把它们放在一起。现在,让我们实现序列化程序包。我必须提到,这个实现并不是使用 Golang 的真实序列化程序的准确示例。我只是想模仿它的一些行为。
type SimpleSummaryArticle struct {
ID uint64 `json:"id"`
Title string `json:"title"`
Summary string `json:"summary"`
More string `json:"more"`
}
首先,我创建了一个结构体,它表示一篇简单摘要文章的 JSON 格式。例如,我们将在菜单、侧边栏等狭小空间中使用它们......
type Article struct {
articles repository.ArticleRepository
summaryWordsLimit int
}
func NewArticle(articles repository.ArticleRepository, summaryWordsLimit int) *Article {
return &Article{articles: articles, summaryWordsLimit: summaryWordsLimit}
}
func (a *Article) ByID(ctx context.Context, id uint64) (SimpleSummaryArticle, error) {
article, err := a.articles.ByID(ctx, id)
if err != nil {
return SimpleSummaryArticle{}, fmt.Errorf("error while retrieving a single article by id: %w", err)
}
return SimpleSummaryArticle{
ID: article.ID,
Title: article.Title,
Summary: a.summarize(article.Content),
More: fmt.Sprintf("https://site.com/a/%d", article.ID),
}, nil
}
func (a *Article) summarize(content string) string {
words := strings.Split(strings.ReplaceAll(content, "\n", " "), " ")
if len(words) > a.summaryWordsLimit {
words = words[:a.summaryWordsLimit]
}
return strings.Join(words, " ")
}
然后,我使用文章存储库从存储库中检索信息并将其转换为所需的值。如您所见,我们可以测试一下 summarize 方法是否正常工作。在 Golang 中,除了实现之外,我们还可以将测试放在带有 _test.go 后缀的地方。例如,我将这个文件命名为 article.go,因此它的测试文件应该是 article_test.go。在该测试文件中,我为文章存储库创建了一个模拟:
type mockArticle struct {
items map[uint64]repository.Article
}
func (m *mockArticle) ByID(ctx context.Context, id uint64) (repository.Article, error) {
val, has := m.items[id]
if !has {
return repository.Article{}, repository.ErrNotFound
}
return val, nil
}
Now, I can easily use this mock to test my serializer package:
现在,我可以轻松地使用此模拟来测试我的序列化程序包:
func TestArticle_ByID(t *testing.T) {
ma := &mockArticle{items: map[uint64]repository.Article{
1: {
ID: 1,
Title: "Title#1",
Content: "content of the first article.",
},
}}
a := NewArticle(ma, 3)
_, err := a.ByID(context.Background(), 10)
assert.ErrorIs(t, repository.ErrNotFound, err)
item, err := a.ByID(context.Background(), 1)
assert.Equal(t, "https://site.com/a/1", item.More)
assert.Equal(t, uint64(1), item.ID)
assert.Equal(t, "content of the", item.Summary)
}
for the assertion, I’ve used the “github.com/stretchr/testify/assert” package. There is an important problem with my code. I haven’t used interfaces to describe my serializers. What if someone else was working on another package that needed my serializers? I won’t change it because it’s just an example, but you should bear that in mind.
对于断言,我使用了“github.com/stretchr/testify/assert”包。我的代码存在一个重要问题。我没有使用接口来描述我的序列化程序。如果其他人正在处理另一个需要我的序列化程序的包,该怎么办?我不会改变它,因为它只是一个例子,但你应该记住这一点。
Let’s write a benchmark. 让我们写一个基准测试。
Benchmarking in Golang is pretty simple. Golang provides us with powerful utilities to write benchmarks. In the same _test files, you can write your benchmarks. Unlike tests, which have the Test prefix, benchmarks have the Benchmark prefix, and instead of receiving *testing.T, they receive *testing.B contains an N property that tells us how many times we should execute our function.
Golang 中的基准测试非常简单。Golang 为我们提供了强大的实用程序来编写基准测试。在相同的_test文件中,您可以编写基准测试。与具有 Test 前缀的测试不同,基准测试具有 Benchmark 前缀,而不是接收 *testing。T,他们接受*测试。B 包含一个 N 属性,它告诉我们应该执行函数多少次。
func BenchmarkArticle(b *testing.B) {
ma := &mockArticle{items: map[uint64]repository.Article{
1: {
ID: 1,
Title: "Title#1",
Content: "content of the first article.",
},
}}
a := NewArticle(ma, 3)
for i := 0; i < b.N; i++ {
a.ByID(context.Background(), 10)
}
}
This is a benchmark I’ve written to see how our serializer is performing.
这是我编写的一个基准测试,用于查看我们的序列化程序的性能。
BenchmarkArticle-6 73575847 15.64 ns/op
结果是这样的。它显示序列化一行平均需要 15.64 纳秒。让我们为我在“使用 Golang 构建像 Uber 这样的在线出租车应用程序 — 第 3 部分,Redis to Rescue!”一文中解释的那些示例实现并编写一些基准测试。
If you store those posts in a plain array, your code needs to check almost every single post to find those needed posts. Thus, in the worst-case scenario, it needs to have 10,000 comparisons. If your server can handle 1000 comparisons per second, it would take 10 seconds — I know 1000 is a pretty low number, but I want to magnify the effects.
如果你将这些帖子存储在一个普通数组中,你的代码需要检查几乎每一个帖子,以找到那些需要的帖子。因此,在最坏的情况下,它需要有 10,000 次比较。如果你的服务器每秒可以处理 1000 次比较,那么需要 10 秒——我知道 1000 是一个相当低的数字,但我想放大效果。What if we stored those posts in order and used algorithms like binary search to find them? I would require almost log2(1000)=10 comparisons to find each post and 10*10=100 for the entire page, which takes 100ms. Although it’s much faster than the first option, we can speed it up more.
如果我们按顺序存储这些帖子并使用二进制搜索等算法来查找它们会怎样?我几乎需要 log2(1000)=10 次比较才能找到每个帖子,整个页面需要 10*10=100,这需要 100 毫秒。虽然它比第一个选项快得多,但我们可以加快速度。What if we used Map as a data structure to store those posts? Each lookup would take one instruction. Thus ten instructions are needed to generate the entire page. It would take 10/1000=10ms.
如果我们使用 Map 作为数据结构来存储这些帖子会怎样?每次查找都需要一条指令。因此,需要十条指令来生成整个页面。这将需要 10/1000=10 毫秒。
// CheckEveryItem looks for the given lookup argument in the slice and returns its index if it is presented.
// Otherwise, it returns -1.
func CheckEveryItem(items []int, lookup int) int {
for i := 0; i < len(items); i++ {
if items[i] == lookup {
return i
}
}
return -1
}
// BinarySearch expects to receive a sorted slice and looks for the index of the given value accordingly.
func BinarySearch(items []int, lookup int) int {
left := 0
right := len(items) - 1
for {
if left == lookup {
return left
}
if right == lookup {
return right
}
center := (right + left) / 2
if items[center] == lookup {
return center
}
if center > lookup {
right = center
}
if center < lookup {
left = center
}
if left >= right-1 {
return -1
}
}
}
As you can see, both of these algorithms have similar behavior. They receive a slice of integers and a number, then search for it and either return the index of the given number or -1 if they haven’t found it. We can write a new type that represents this behavior.
如您所见,这两种算法都具有相似的行为。他们收到一个整数切片和一个数字,然后搜索它并返回给定数字的索引,如果他们没有找到它,则返回 -1。我们可以编写一个表示此行为的新类型。
type Algorithm func(items []int, lookup int) int
This type helps us to write tests and benchmarks once instead of writing them for each algorithm.
这种类型可以帮助我们编写一次测试和基准测试,而不是为每个算法编写它们。
func testAlgorithm(alg Algorithm, t *testing.T) {
items := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
for i := 0; i <= 9; i++ {
assert.Equal(t, i, alg(items, i))
}
assert.Equal(t, -1, alg(items, 100))
}
As you can see, I haven’t followed Golang’s rule for tests. Instead of starting with uppercase, I’ve written them in lowercase. Because they are not actual tests, we’ll use them to write actual tests.
正如你所看到的,我没有遵循 Golang 的测试规则。我没有以大写字母开头,而是用小写字母编写它们。因为它们不是实际的测试,所以我们将使用它们来编写实际的测试。
func TestCheckEveryItem(t *testing.T) {
testAlgorithm(CheckEveryItem, t)
}
func TestBinarySearch(t *testing.T) {
testAlgorithm(BinarySearch, t)
}
As you can see, It’s pretty easy to write more tests for these algorithms, and changing those tests would affect all of them.
正如你所看到的,为这些算法编写更多的测试是很容易的,而改变这些测试将影响所有这些测试。
func benchmarkAlgorithm(alg Algorithm, b *testing.B) {
totalItems := int(1e3)
items := make([]int, totalItems)
for i := 0; i < totalItems; i++ {
items[i] = i
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
lookup := rand.Intn(totalItems - 1)
alg(items, lookup)
}
}
I’ve done the same thing for the benchmarks. As you can see, it makes a slice with 1000 members, and each time looks for a random number in that range. Pay attention to the b.ResetTimer(). Creating a large slice will take a while, and we don’t want that to affect the benchmark’s result.
我为基准测试做了同样的事情。如您所见,它制作了一个包含 1000 个成员的切片,并且每次都在该范围内查找一个随机数。注意 b.ResetTimer()。创建一个大切片需要一段时间,我们不希望这影响基准测试的结果。
func BenchmarkCheckEveryItem(b *testing.B) {
benchmarkAlgorithm(CheckEveryItem, b)
}
func BenchmarkBinarySearch(b *testing.B) {
benchmarkAlgorithm(BinarySearch, b)
}
Let’s launch these tests to see how each algorithm performs.
让我们启动这些测试,看看每个算法的表现如何。
The CheckEveryItem algorithm took 143.7 ns/op, and BinarySearch took 58.54 ns/op to complete. May you ask if we’ve done all these to improve the search algorithm by a couple of nanoseconds? No. Let’s increase the size of the slice to one million.
CheckEveryItem 算法耗时 143.7 ns/op,BinarySearch 耗时 58.54 ns/op 完成。请问我们是否已经完成了所有这些工作,将搜索算法改进了几纳秒?不。让我们将切片的大小增加到 100 万。
func benchmarkAlgorithm(alg Algorithm, b *testing.B) {
totalItems := int(1e6) // increased from 1e3 to 1e6
items := make([]int, totalItems)
...
The CheckEveryItem took 199 μs/op (199255 ns/op) to complete, while the binary search stayed in the nanosecond realm with 145.6 ns/op despite that amount of increase in the size of the slice. Let’s try it with a hundred million items.
CheckEveryItem 需要 199 μs/op (199255 ns/op) 才能完成,而二进制搜索仍以 145.6 ns/op 保持在纳秒领域,尽管切片大小有所增加。让我们用一亿个项目来尝试一下。
Since binary search is a logarithmic algorithm, it just took 302.6 ns/op to complete. Meanwhile, for the CheckEvertItem, this number was way higher, with 28 ms/op (28973093 ns/op).
由于二进制搜索是一种对数算法,因此只需 302.6 ns/op 即可完成。同时,对于CheckEvertItem,这个数字要高得多,为28 ms/op (28973093 ns/op)。
Now we know what dependency injection and separation of responsibility are and how we can follow them in Golang, and how we can write tests and benchmarks in Go. Let’s use them to implement the Bolbol service.
现在我们知道了什么是依赖注入和责任分离,以及如何在 Golang 中遵循它们,以及如何在 Go 中编写测试和基准测试。让我们使用它们来实现 Bolbol 服务。
Implementing Bolbol 实施 Bolbol
As we’ve discussed before, the design of this notification system will be something like this:
正如我们之前所讨论的,这个通知系统的设计将是这样的:
Whenever a new notification is created, it follows these steps.
每当创建新通知时,它都会遵循以下步骤。
- Sends the notification up to the server.
将通知发送到服务器。 - Service saves that notification in the storage.
服务将该通知保存在存储中。 - The signal package notifies the customer’s request that there is a new notification for them.
信号包通知客户的请求有新的通知。 - Retrieves the notification from the storage.
从存储中检索通知。
The customer is either online and gets the updates in real-time or offline, and they’ll receive the updates whenever they get back online.
客户要么在线,要么实时或离线获取更新,每当他们重新联机时,他们都会收到更新。
Storage package 存储包
Let’s create the storage package. First, we need to define its interface:
让我们创建存储包。首先,我们需要定义它的接口:
var ErrEmpty = errors.New("no notifications found")
type Storage interface {
Push(ctx context.Context, clientID int, notification entity.Notification) error
Count(ctx context.Context, clientID int) (int, error)
Pop(ctx context.Context, clientID int) (entity.Notification, error)
PopAll(ctx context.Context, clientID int) ([]entity.Notification, error)
}
What is the entity package? For this system, I’ve stored all of the structs and types in a package that I’ve called the entity. For instance, this is the notification interface and some of its implementation.
什么是实体包?对于这个系统,我已将所有结构和类型存储在一个包中,我将其称为实体。例如,这是通知接口及其一些实现。
type Notification interface {
IsNotification()
}
type BaseNotification struct {
CreatedAt time.Time `json:"createdAt"`
}
func (BaseNotification) IsNotification() {}
type UnreadWorkRequest struct {
BaseNotification
WorkID int `json:"workID"`
Title string `json:"title"`
}
type UnreadMessagesNotification struct {
BaseNotification
Count int `json:"count"`
}
I have two implementations for the storage package. The first one uses a channel as a buffer queue to store notifications in order, and the second one uses a slice of notifications. This is the one using channels:
我有两个存储包的实现。第一个使用通道作为缓冲区队列来按顺序存储通知,第二个使用通知切片。这是使用通道的那个:
type memoryWithChannel struct {
storage *sync.Map
size int
}
func NewMemoryWithChannel(size int) Storage {
return &memoryWithChannel{
storage: new(sync.Map),
size: size,
}
}
func (m *memoryWithChannel) Push(ctx context.Context, clientID int, notification entity.Notification) error {
c := m.get(clientID)
if len(c) == m.size {
<-c // remove the latest item and requires a garbage collection to be deleted from the heap
}
c <- notification
return nil
}
func (m *memoryWithChannel) Count(ctx context.Context, clientID int) (int, error) {
c := m.get(clientID)
return len(c), nil
}
func (m *memoryWithChannel) PopAll(ctx context.Context, clientID int) ([]entity.Notification, error) {
c := m.get(clientID)
l := len(c)
items := make([]entity.Notification, l)
for i := 0; i < l; i++ {
items[i] = <-c
}
return items, nil
}
func (m *memoryWithChannel) Pop(ctx context.Context, clientID int) (entity.Notification, error) {
c := m.get(clientID)
select {
case item := <-c:
return item, nil
default:
return nil, ErrEmpty
}
}
func (m *memoryWithChannel) get(clientID int) chan entity.Notification {
cInf, _ := m.storage.LoadOrStore(clientID, make(chan entity.Notification, m.size))
return cInf.(chan entity.Notification)
}
And this is the one with slices;
这是有切片的那个;
type userStorage struct {
mu *sync.Mutex
notifications []entity.Notification
}
type memoryWithList struct {
size int
storage *sync.Map
}
func NewMemoryWithList(size int) Storage {
return &memoryWithList{
size: size,
storage: new(sync.Map),
}
}
func (m *memoryWithList) Push(ctx context.Context, clientID int, notification entity.Notification) error {
item := m.get(ctx, clientID)
item.mu.Lock()
defer item.mu.Unlock()
if len(item.notifications) == m.size {
item.notifications = item.notifications[1:]
}
item.notifications = append(item.notifications, notification)
return nil
}
func (m *memoryWithList) Count(ctx context.Context, clientID int) (int, error) {
item := m.get(ctx, clientID)
return len(item.notifications), nil // len is a thread-safe function
}
func (m *memoryWithList) Pop(ctx context.Context, clientID int) (entity.Notification, error) {
item := m.get(ctx, clientID)
if len(item.notifications) == 0 {
return nil, ErrEmpty
}
item.mu.Lock()
defer item.mu.Unlock()
notification := item.notifications[0]
item.notifications = item.notifications[1:]
return notification, nil
}
func (m *memoryWithList) PopAll(ctx context.Context, clientID int) ([]entity.Notification, error) {
item := m.get(ctx, clientID)
if len(item.notifications) == 0 {
return nil, ErrEmpty
}
item.mu.Lock()
defer item.mu.Unlock()
defer func() {
item.notifications = make([]entity.Notification, 0, m.size)
}()
return item.notifications, nil
}
func (m *memoryWithList) get(ctx context.Context, clientID int) *userStorage {
item, _ := m.storage.LoadOrStore(clientID, &userStorage{mu: new(sync.Mutex), notifications: make([]entity.Notification, 0, m.size)})
return item.(*userStorage)
}
As you can see, when we work with concurrency, we should consider race conditions. Golang provides the sync package to help us handle these problems. These are some tips that I’ve used to write these packages:
正如你所看到的,当我们使用并发时,我们应该考虑竞争条件。Golang 提供了同步包来帮助我们处理这些问题。以下是我用来编写这些包的一些技巧:
- Channels are thread-safe. You can write and read from them simultaneously in multiple threads.
通道是线程安全的。您可以在多个线程中同时写入和读取它们。 - Default maps are not thread-safe. Instead of using a mutex, you can use the built-in sync.Map, which is thread-safe.
默认映射不是线程安全的。您可以使用内置同步,而不是使用互斥锁。Map,它是线程安全的。 - The contents of a sync.Map is not thread-safe. As you can see, I’ve used a mutex for each slice.
同步的内容。Map 不是线程安全的。正如你所看到的,我为每个切片都使用了互斥锁。 - Len is thread-safe. Len is one of the few functions that are thread-safe.
Len 是线程安全的。Len 是为数不多的线程安全函数之一。 - the size of a channel is fixed. Unlike slices and maps, channels are fixed-sized, and whenever you make them, Go allocates the given size, and you are not able to change them in the future.
通道的大小是固定的。与切片和地图不同,通道是固定大小的,每当您制作它们时,Go 都会分配给定的大小,并且您将来无法更改它们。
When you work with concurrency, race conditions are not the only problem you’ll face. There is an intentional bug in one of my implementations to show you whether you test your code or not, you may still forget about some situation that will happen while your code is under load, and these are the bugs that cause headaches!
使用并发时,争用条件并不是您将面临的唯一问题。我的一个实现中有一个故意的错误,以显示你是否测试你的代码,你可能仍然会忘记你的代码在负载下会发生的一些情况,这些都是引起头痛的错误!
Say we’ve used the implementation with the channel, and we have 100 notifications stored for a user. What would happen if the user calls the PopAll method with two simultaneous requests?
假设我们已将实现与通道一起使用,并且为用户存储了 100 条通知。如果用户同时使用两个请求调用 PopAll 方法,会发生什么情况?
func (m *memoryWithChannel) PopAll(ctx context.Context, clientID int) ([]entity.Notification, error) {
c := m.get(clientID)
l := len(c)
items := make([]entity.Notification, l)
for i := 0; i < l; i++ {
items[i] = <-c
}
return items, nil
}
我们不会面对比赛条件。但是,假设这两个请求同时调用 len(c)。两者的 L 变量均为 100。因此,他们俩都尝试从通道中检索 100 个项目。但是,两人都无法取回这 100 件物品,他们将面临僵局!但是,让我们看一下使用切片的实现。
func (m *memoryWithList) PopAll(ctx context.Context, clientID int) ([]entity.Notification, error) {
item := m.get(ctx, clientID)
if len(item.notifications) == 0 {
return nil, ErrEmpty
}
item.mu.Lock()
defer item.mu.Unlock()
defer func() {
item.notifications = make([]entity.Notification, 0, m.size)
}()
return item.notifications, nil
}
映射的每个项都有一个互斥锁,每当为该用户调用 PopAll 时,该互斥锁都会锁定切片,并且在执行整个方法之前不会解锁它。因此,我们不会面临这个问题。
func testNewMemory(m Storage, t *testing.T) {
ctx := context.Background()
m.Push(ctx, 10, entity.UnreadMessagesNotification{Count: 1})
m.Push(ctx, 10, entity.UnreadMessagesNotification{Count: 2})
m.Push(ctx, 10, entity.UnreadMessagesNotification{Count: 3})
c, _ := m.Count(ctx, 10)
assert.Equal(t, 3, c)
p, err := m.Pop(ctx, 10)
assert.NoError(t, err)
assert.Equal(t, 1, p.(entity.UnreadMessagesNotification).Count)
all, _ := m.PopAll(ctx, 10)
assert.Equal(t, 2, len(all))
for i := 0; i < 15; i++ {
m.Push(ctx, 10, entity.UnreadMessagesNotification{Count: i})
}
f, err := m.Pop(ctx, 10)
assert.NoError(t, err)
assert.Equal(t, 5, f.(entity.UnreadMessagesNotification).Count)
}
func benchmarkMemory_PushAverage(m Storage, b *testing.B) {
ctx := context.Background()
for i := 0; i < b.N; i++ {
id := rand.Intn(1000)
m.Push(ctx, id, entity.UnreadMessagesNotification{Count: i})
}
b.StopTimer()
pkg.PrintMemUsage()
}
func benchmarkMemory_PushNewItem(m Storage, b *testing.B) {
ctx := context.Background()
counter := 0
for i := 0; i < b.N; i++ {
m.Push(ctx, i, entity.UnreadMessagesNotification{Count: i})
counter++
}
b.StopTimer()
b.Log("for ", b.N, " notifications: ")
pkg.PrintMemUsage()
}
我已经编写了这些测试和基准测试,例如算法示例。对于他们俩来说,推送新通知平均需要 3μs。但是,这够了吗?记忆呢?此基准测试是否会导致内存泄漏?在这些测试期间,我使用这些函数打印出内存和垃圾回收器的行为:
func PrintMemUsage() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
fmt.Printf("\tNumGC = %v\n", m.NumGC)
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}
让我们看看它分配了多少内存。
func BenchmarkMemoryWithChannel_PushNewItem(b *testing.B) {
benchmarkMemory_PushNewItem(NewMemoryWithChannel(1000), b)
}
BenchmarkMemoryWithChannel_PushNewItem
memory_test.go:54: for 1 notifications:
Alloc = 0 MiB TotalAlloc = 0 MiB Sys = 12 MiB NumGC = 2
memory_test.go:54: for 100 notifications:
Alloc = 1 MiB TotalAlloc = 1 MiB Sys = 12 MiB NumGC = 3
memory_test.go:54: for 10000 notifications:
Alloc = 158 MiB TotalAlloc = 160 MiB Sys = 179 MiB NumGC = 9
memory_test.go:54: for 214172 notifications:
Alloc = 3400 MiB TotalAlloc = 3569 MiB Sys = 3562 MiB NumGC = 19
memory_test.go:54: for 286639 notifications:
Alloc = 4544 MiB TotalAlloc = 8121 MiB Sys = 4756 MiB NumGC = 29
BenchmarkMemoryWithChannel_PushNewItem-6 286639 4481 ns/op
让我解释一下发生了什么。对于286639唯一用户,这将分配 4.5GB 内存。为什么?我们没有在存储中推送任何东西。正如我之前提到的,Golang 预先为通道分配内存。我们有 286639 个用户,每个用户都有一个带有 1000 个通知缓冲区的通道。切片呢?
func BenchmarkMemoryWithList_PushNewItem(b *testing.B) {
benchmarkMemory_PushNewItem(NewMemoryWithList(1000), b)
}
BenchmarkMemoryWithList_PushNewItem
memory_test.go:54: for 1 notifications:
Alloc = 0 MiB TotalAlloc = 0 MiB Sys = 12 MiB NumGC = 2
memory_test.go:54: for 100 notifications:
Alloc = 1 MiB TotalAlloc = 1 MiB Sys = 12 MiB NumGC = 3
memory_test.go:54: for 10000 notifications:
Alloc = 158 MiB TotalAlloc = 160 MiB Sys = 175 MiB NumGC = 9
memory_test.go:54: for 285522 notifications:
Alloc = 4511 MiB TotalAlloc = 4679 MiB Sys = 4722 MiB NumGC = 19
BenchmarkMemoryWithList_PushNewItem-6 285522 4157 ns/op
我们的性能和内存使用率几乎相同。为什么?此内存使用量是由于以下行:
func (m *memoryWithList) get(ctx context.Context, clientID int) *userStorage {
item, _ := m.storage.LoadOrStore(clientID, &userStorage{mu: new(sync.Mutex), notifications: make([]entity.Notification, 0, m.size)})
return item.(*userStorage)
}
通知:make([]entity。Notification, 0, m.size)} 我们告诉 Golang 像 Channels 一样预先分配这 1000 个项目。但是,切片支持动态长度。因此,我们将在需要时分配内存,而不是预先分配内存:
func (m *memoryWithList) get(ctx context.Context, clientID int) *userStorage {
item, _ := m.storage.LoadOrStore(clientID, &userStorage{mu: new(sync.Mutex)})
return item.(*userStorage)
}
我们还需要更改 Pop 函数:
func (m *memoryWithList) PopAll(ctx context.Context, clientID int) ([]entity.Notification, error) {
item := m.get(ctx, clientID)
if len(item.notifications) == 0 {
return nil, ErrEmpty
}
item.mu.Lock()
defer item.mu.Unlock()
defer func() {
item.notifications = nil
}()
return item.notifications, nil
}
让我们再次运行基准测试:
BenchmarkMemoryWithList_PushNewItem
memory_test.go:54: for 1 notifications:
Alloc = 0 MiB TotalAlloc = 0 MiB Sys = 12 MiB NumGC = 2
memory_test.go:54: for 100 notifications:
Alloc = 0 MiB TotalAlloc = 0 MiB Sys = 12 MiB NumGC = 3
memory_test.go:54: for 10000 notifications:
Alloc = 2 MiB TotalAlloc = 2 MiB Sys = 13 MiB NumGC = 4
memory_test.go:54: for 1000000 notifications:
Alloc = 201 MiB TotalAlloc = 234 MiB Sys = 227 MiB NumGC = 12
memory_test.go:54: for 1521570 notifications:
Alloc = 264 MiB TotalAlloc = 530 MiB Sys = 291 MiB NumGC = 20
memory_test.go:54: for 1983344 notifications:
Alloc = 400 MiB TotalAlloc = 993 MiB Sys = 439 MiB NumGC = 29
BenchmarkMemoryWithList_PushNewItem-6 1983344 740.2 ns/
如您所见,它不是预先分配 4.5GB,而是分配 400MB。右?不。它甚至更好。之前的基准是针对285522项目。然而,这个制作了1983344件物品,大约是 10 倍。我已将总项目限制为 b.N = 285522 的 285522,让我们看看它的表现如何:
BenchmarkMemoryWithList_PushNewItem
memory_test.go:55: for 285522 notifications:
Alloc = 54 MiB TotalAlloc = 62 MiB Sys = 71 MiB NumGC = 7
差不多54MB,比之前的结果要好得多。
信号包
现在是时候实现信号包了。您可能已经猜到了,我们使用通道来传输信号。
var (
ErrEmpty = errors.New("no topic found")
)
type Signal interface {
Subscribe(id string) (<-chan struct{}, func(), error)
Publish(id string) error
}
发布功能似乎非常简单。subscribe 函数接收主题 ID 并返回只读通道、cancel 函数,可能还返回错误。什么是取消功能?您可能会看到,在某些包中,它们返回了一个取消函数,我们必须在流程结束时调用该函数;否则,我们可能会面临内存泄漏。请看这个实现:
type topic struct {
listeners []chan<- struct{}
mu *sync.Mutex
}
type signal struct {
listeners *sync.Map
topicSize int
}
func NewSignal() Signal {
return &signal{
listeners: new(sync.Map),
}
}
func (c *signal) Subscribe(id string) (<-chan struct{}, func(), error) {
topicInf, _ := c.listeners.LoadOrStore(id, &topic{mu: new(sync.Mutex)})
t := topicInf.(*topic)
t.mu.Lock()
defer t.mu.Unlock()
ch := make(chan struct{}, 1)
t.listeners = append(t.listeners, ch)
return ch, func() {
t.mu.Lock()
defer t.mu.Unlock()
for i := 0; i < len(t.listeners); i++ {
if t.listeners[i] == ch {
t.listeners = append(t.listeners[:i], t.listeners[i+1:]...)
}
}
}, nil
}
func (c *signal) Publish(id string) error {
topicInf, ok := c.listeners.Load(id)
if !ok {
return ErrEmpty
}
topic := topicInf.(*topic)
l := len(topic.listeners)
if l == 0 {
return ErrEmpty
}
for i := 0; i < l; i++ {
topic.listeners[i] <- struct{}{}
}
return nil
}
调用订阅函数时,它会创建一个新频道,并将其与该特定主题中的其他频道一起存储。但是,当用户尝试断开连接时,该通道会发生什么情况?我们尚未向他们发送通知,频道仍处于开放状态。我们使用这些取消函数来允许我们在需要防止内存泄漏时从切片中删除通道。我还编写了这个测试来检查这个包的功能:
func TestNewChannel(t *testing.T) {
s := NewSignal()
ac, _, err := s.Subscribe("a")
assert.NoError(t, err)
ac2, _, err := s.Subscribe("a")
assert.NoError(t, err)
s.Publish("a")
select {
case <-ac:
default:
t.Fatal("didn't receive the signal")
}
select {
case <-ac2:
default:
t.Fatal("didn't receive the signal")
}
err = s.Publish("b")
assert.ErrorIs(t, err, ErrEmpty)
_, cancel, err := s.Subscribe("c")
assert.NoError(t, err)
cancel()
err = s.Publish("c")
assert.ErrorIs(t, err, ErrEmpty)
}
实现 Bolbol 包
现在我们已经编写了存储和信号包,我们需要使用这些包创建我们的应用程序。我创建了一个名为 Bolbol 的新包,并将存储、实体和信号包移动到该包中。(它不会改变任何东西,这只是个人喜好)。
type Bolbol struct {
Storage storage.Storage
Signal signal.Signal
defaultTimeout time.Duration
}
func NewBolbol(str storage.Storage, sig signal.Signal) *Bolbol {
return &Bolbol{
Storage: str,
Signal: sig,
defaultTimeout: 2 * time.Minute, //todo: read this value from a configmap or environments
}
}
func Build() *Bolbol {
str := storage.NewMemoryWithChannel(100)
sig := signal.NewSignal()
return NewBolbol(str, sig)
}
这是 bolbol.go 文件。正如你所看到的,我们有 NewBolBol 函数,它只是另一个构造函数,以及 Build 函数,它启动依赖项,然后使用这些依赖项创建 Bolbol。我们还需要两个函数,第一个允许用户收听他们的通知,第二个允许用户推送新通知。
func (b *Bolbol) GetNotifications(ctx context.Context, clientID int) ([]entity.Notification, error) {
c, err := b.Storage.Count(ctx, clientID)
if err != nil {
return nil, fmt.Errorf("error while counting user's notifications: %w", err)
}
if c > 0 {
return b.Storage.PopAll(ctx, clientID)
}
ch, cancel, err := b.Signal.Subscribe("user#" + strconv.Itoa(clientID))
defer cancel()
if err != nil {
return nil, fmt.Errorf("error while trying to listen on notification topic: %w", err)
}
ctx, ctxCancel := context.WithTimeout(ctx, b.defaultTimeout)
defer ctxCancel()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ch:
return b.Storage.PopAll(ctx, clientID)
}
}
我已将函数放入一个单独的文件中,我将其称为getNotification.go。
func (b *Bolbol) Notify(ctx context.Context, userID int, notification entity.Notification) error {
if err := b.Storage.Push(ctx, userID, notification); err != nil {
return fmt.Errorf("error while trying to push the new notification: %w", err)
}
_ = b.Signal.Publish("user#" + strconv.Itoa(userID))
return nil
}
让我们看看应用程序的结构是怎样的:
为什么我没有把它们都放在根目录下并制作一个新包?因为我们仍然缺少应用程序的最后一部分,即表示层。用户如何与此包通信?这不重要。我们可以编写一个 HTTP 长轮询 Web 服务器或使用终端与应用程序进行通信。甚至两者兼而有之。我们已将通信的关注点与应用程序的其他部分分开。我们拥有更强大的力量!
好了,让我们通过创建具有长轮询的简单通信来结束本教程。我们可以谈论几天的通信协议。然而,这是一篇相当长的文章。
func (s *Server) listen(c echo.Context) error {
clientId, _ := strconv.Atoi(c.Param("id"))
notifications, err := s.bolbol.GetNotifications(c.Request().Context(), clientId)
if err != nil {
return err
}
return c.JSON(200, notifications)
}
type NotifyRequest struct {
UserID int `json:"userID"`
UnreadMessage *entity.UnreadMessagesNotification `json:"unreadMessage"`
UnreadWorkRequest *entity.UnreadWorkRequest `json:"unreadWorkRequest"`
}
func (n *NotifyRequest) Notification() entity.Notification {
if n.UnreadMessage != nil {
return n.UnreadMessage
}
if n.UnreadWorkRequest != nil {
return n.UnreadWorkRequest
}
panic("bad notification")
}
func (s *Server) notify(c echo.Context) error {
var request NotifyRequest
if err := c.Bind(&request); err != nil {
return err
}
// the notify method's timeout doesn't depend on the request's timeout.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := s.bolbol.Notify(ctx, request.UserID, request.Notification()); err != nil {
return err
}
return c.String(201, "notification created")
}
每当用户连接到应用程序时,它要么为他们存储新消息,要么保持 HTTP 连接,直到用户收到新通知。
- 点赞
- 收藏
- 关注作者
评论(0)