33
goroutine/task.go
Normal file
33
goroutine/task.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package goroutine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"goLearn/model"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Runtask(ctx context.Context) {
|
||||||
|
ctx2 := ctx.Value("value").(model.Task)
|
||||||
|
fmt.Println(ctx2.Id)
|
||||||
|
|
||||||
|
config := clientv3.Config{
|
||||||
|
Endpoints: []string{"127.0.0.1:2379"}, // etcd节点地址,集群模式可填多个
|
||||||
|
DialTimeout: 5 * time.Second, // 连接超时时间
|
||||||
|
// 如需认证,添加以下配置
|
||||||
|
// Username: "root",
|
||||||
|
// Password: "123456",
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 建立连接
|
||||||
|
client, err := clientv3.New(config)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("连接etcd失败: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer client.Close() // 程序退出时关闭连接
|
||||||
|
|
||||||
|
fmt.Println("成功连接到etcd")
|
||||||
|
|
||||||
|
}
|
||||||
134
timu.md
Normal file
134
timu.md
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
基础要求生产者: 2 个协程,每秒生成 1 个随机数(0-100),发送到缓冲通道(容量为 10)。 使用 sync.Mutex保护以下操作: 记录每个数字的生成次数(map[int]int) 统计总生产数量(int)消费者: 3 个协程,从通道读取数据并打印格式:消费者ID: 处理数据X (历史出现次数: Y)。 使用 sync.RWMutex保护对 map[int]int的读取(查询历史次数),写操作仍由 sync.Mutex保护。主协程: 5 秒后关闭通道,等待所有协程退出。 程序退出前,打印最终统计数据: 总生产/消费次数 每个数字的出现频率(按频率降序排列)进阶要求性能优化: 消费者读取历史次数时,必须使用 RLock()允许多个消费者并发查询。 生产者更新 map和计数器时,用最短的锁持有时间(例如:合并写锁操作)。异常处理: 若通道已满,生产者需打印警告并丢弃数据。 消费者检测到通道关闭后立即退出,打印退出日志。扩展功能(可选): 添加一个监控协程,每秒打印当前最频繁生成的 3 个数字(需使用 RWMutex读取数据)。
|
||||||
|
|
||||||
|
```
|
||||||
|
package model
|
||||||
|
|
||||||
|
type Task struct {
|
||||||
|
Id string
|
||||||
|
Content string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Data struct {
|
||||||
|
Count int
|
||||||
|
Record map[int]int
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"goLearn/goroutine"
|
||||||
|
"goLearn/model"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var Ch = make(chan model.Task, 3)
|
||||||
|
|
||||||
|
var mutex sync.Mutex
|
||||||
|
var rwmutex sync.RWMutex
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
data := &model.Data{
|
||||||
|
Count: 0,
|
||||||
|
Record: make(map[int]int),
|
||||||
|
}
|
||||||
|
donetitle := make(chan struct{})
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go goroutine.NewProducer(Ch, &wg, donetitle, i, &mutex, data, &rwmutex)
|
||||||
|
}
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go goroutine.NewConsumer(Ch, &wg, donetitle, i, data, &rwmutex)
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
close(donetitle)
|
||||||
|
wg.Wait()
|
||||||
|
fmt.Println("任务完成")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
package goroutine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"goLearn/model"
|
||||||
|
"math/rand"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewProducer(ch chan model.Task, wg *sync.WaitGroup, done chan struct{}, num int, mutex *sync.Mutex, data *model.Data, rwmutex *sync.RWMutex) {
|
||||||
|
defer wg.Done()
|
||||||
|
timech := time.Tick(1 * time.Second)
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timech:
|
||||||
|
fmt.Printf("生产者%d号上锁\n", num+1)
|
||||||
|
randnum := rand.Intn(100)
|
||||||
|
task := model.Task{
|
||||||
|
Id: strconv.Itoa(randnum),
|
||||||
|
Content: "访问数据库",
|
||||||
|
}
|
||||||
|
mutex.Lock()
|
||||||
|
data.Count += 1
|
||||||
|
mutex.Unlock()
|
||||||
|
|
||||||
|
rwmutex.Lock()
|
||||||
|
data.Record[randnum] += 1
|
||||||
|
rwmutex.Unlock()
|
||||||
|
|
||||||
|
fmt.Printf("生产者%d号注入编号为%s,内容为:%s\n", num+1, task.Id, task.Content)
|
||||||
|
ch <- task
|
||||||
|
fmt.Printf("生产者%d号释放锁\n", num+1)
|
||||||
|
|
||||||
|
case <-done:
|
||||||
|
fmt.Printf("生产者%d号退出任务\n", num+1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
package goroutine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"goLearn/model"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewConsumer(ch chan model.Task, wg *sync.WaitGroup, done chan struct{}, num int, data *model.Data, rwmutex *sync.RWMutex) {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
rwmutex.RLock()
|
||||||
|
fmt.Printf("消费者%d号处理消息,统计消息总数为:%d\n", num+1, data.Count)
|
||||||
|
fmt.Printf("消费者%d号:当前记录详情如下:\n", num+1)
|
||||||
|
for key, value := range data.Record {
|
||||||
|
fmt.Printf(" 键:%d,值:%d\n", key, value)
|
||||||
|
}
|
||||||
|
rwmutex.RUnlock()
|
||||||
|
case <-done:
|
||||||
|
fmt.Printf("消费者%d号退出接收\n", num+1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user