From c69251cc2641fa64d93961c2335ea033a30b8243 Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 15 Aug 2025 03:53:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=B1=A0=E5=AD=A6=E4=B9=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- connectPool/goroutine/readgo.go | 35 ++++++++++++++++++++++++ connectPool/goroutine/updatego.go | 11 ++++++++ connectPool/goroutine/writego.go | 45 +++++++++++++++++++++++++++++++ connectPool/pool.go | 33 +++++++++++++++++++++++ goroutine/consumer.go | 26 ++++++++++++++++++ goroutine/producer.go | 44 ++++++++++++++++++++++++++++++ 6 files changed, 194 insertions(+) create mode 100644 connectPool/goroutine/readgo.go create mode 100644 connectPool/goroutine/updatego.go create mode 100644 connectPool/goroutine/writego.go create mode 100644 connectPool/pool.go create mode 100644 goroutine/consumer.go create mode 100644 goroutine/producer.go diff --git a/connectPool/goroutine/readgo.go b/connectPool/goroutine/readgo.go new file mode 100644 index 0000000..23cb364 --- /dev/null +++ b/connectPool/goroutine/readgo.go @@ -0,0 +1,35 @@ +package goroutine + +import ( + "database/sql" + "fmt" + "math/rand" + "time" +) + +func Dataread(db *sql.DB, donetitle chan struct{}) { + // 启动一个长事务(整个循环在一个事务内,而非每次查询一个事务) + tx, err := db.Begin() + if err != nil { + panic(err) + } + defer tx.Rollback() // 确保退出时回滚(仅为测试) + + // 随机种子(保持不变) + rand.Seed(time.Now().UnixNano()) + + // 只执行两次查询,方便观察结果 + for i := 0; i < 2; i++ { + select { + case <-time.After(2 * time.Second): // 第一次查询后等待2秒,给Datawrite插入时间 + var count int + err = tx.QueryRow("SELECT count(*) FROM article WHERE age > 50 FOR UPDATE ").Scan(&count) + if err != nil { + panic(err) + } + fmt.Printf("第%d次查询 count: %d\n", i+1, count) + case <-donetitle: + return + } + } +} diff --git a/connectPool/goroutine/updatego.go b/connectPool/goroutine/updatego.go new file mode 100644 index 0000000..69c80ca --- /dev/null +++ b/connectPool/goroutine/updatego.go @@ -0,0 +1,11 @@ +package goroutine + +import "database/sql" + +func DataUpdate(db *sql.DB) { + _, err := db.Begin() + if err != nil { + panic(err) + } + +} diff --git a/connectPool/goroutine/writego.go b/connectPool/goroutine/writego.go new file mode 100644 index 0000000..c81580f --- /dev/null +++ b/connectPool/goroutine/writego.go @@ -0,0 +1,45 @@ +package goroutine + +import ( + "database/sql" + "fmt" + "github.com/google/uuid" + "math/rand" + "strconv" + "time" +) + +func Datawrite(db *sql.DB, donetitle chan struct{}) { + timetick := time.Tick(1 * time.Second) + rand.Seed(time.Now().UnixNano()) + + for { + select { + case <-timetick: + uid, error := uuid.NewRandom() + if error != nil { + panic(error) + } + tx, err := db.Begin() + if err != nil { + panic(err) + } + fmt.Println("开始事务") + _, err = tx.Exec(`insert INTO article (uid,content,author,age) values (?,?,?,?)`, uid, "123", "用户"+strconv.Itoa(rand.Intn(1000)), rand.Intn(40)+60) + if err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + panic("回滚失败: " + rbErr.Error()) + } + panic("插入失败: " + err.Error()) // 明确错误类型 + } + err = tx.Commit() + if err != nil { + panic(err) + } + fmt.Println("插入数据成功") + case <-donetitle: + return + } + } + +} diff --git a/connectPool/pool.go b/connectPool/pool.go new file mode 100644 index 0000000..5add96e --- /dev/null +++ b/connectPool/pool.go @@ -0,0 +1,33 @@ +package connectPool + +import ( + "database/sql" + "fmt" + _ "github.com/go-sql-driver/mysql" + "goLearn/connectPool/cursor" +) + +var db *sql.DB + +func ConnectPool(donetitle chan struct{}) { + dsn := "mayiming:Mydream5654my,@tcp(43.142.81.151:3306)/goLearn?charset-uft8mb4&parseTime=True" + db, _ = sql.Open("mysql", dsn) + + err := db.Ping() + if err != nil { + panic(err) + } + fmt.Println("数据库连接成功") + //for i := 0; i < 20; i++ { + // go goroutine.Datawrite(db, donetitle) + //} + //go goroutine.Dataread(db, donetitle) + cursor.Runcorsor(db) +} + +func DisconnectPool() { + err := db.Close() + if err != nil { + panic(err) + } +} diff --git a/goroutine/consumer.go b/goroutine/consumer.go new file mode 100644 index 0000000..302eb37 --- /dev/null +++ b/goroutine/consumer.go @@ -0,0 +1,26 @@ +package goroutine + +import ( + "fmt" + "goLearn/model" + "sync" +) + +func NewConsumer(ch chan model.Task, wg *sync.WaitGroup, done chan struct{}, num int, data *model.Data, rwmutex *sync.RWMutex) { + defer wg.Done() + for { + select { + case <-ch: + rwmutex.RLock() + fmt.Printf("消费者%d号处理消息,统计消息总数为:%d\n", num+1, data.Count) + fmt.Printf("消费者%d号:当前记录详情如下:\n", num+1) + for key, value := range data.Record { + fmt.Printf(" 键:%d,值:%d\n", key, value) + } + rwmutex.RUnlock() + case <-done: + fmt.Printf("消费者%d号退出接收\n", num+1) + return + } + } +} diff --git a/goroutine/producer.go b/goroutine/producer.go new file mode 100644 index 0000000..2f5a583 --- /dev/null +++ b/goroutine/producer.go @@ -0,0 +1,44 @@ +package goroutine + +import ( + "fmt" + "goLearn/model" + "math/rand" + "strconv" + "sync" + "time" +) + +func NewProducer(ch chan model.Task, wg *sync.WaitGroup, done chan struct{}, num int, mutex *sync.Mutex, data *model.Data, rwmutex *sync.RWMutex) { + defer wg.Done() + timech := time.Tick(1 * time.Second) + rand.Seed(time.Now().UnixNano()) + for { + select { + case <-timech: + fmt.Printf("生产者%d号上锁\n", num+1) + randnum := rand.Intn(100) + task := model.Task{ + Id: strconv.Itoa(randnum), + Content: "访问数据库", + } + mutex.Lock() + data.Count += 1 + mutex.Unlock() + + rwmutex.Lock() + data.Record[randnum] += 1 + rwmutex.Unlock() + + fmt.Printf("生产者%d号注入编号为%s,内容为:%s\n", num+1, task.Id, task.Content) + ch <- task + fmt.Printf("生产者%d号释放锁\n", num+1) + + case <-done: + fmt.Printf("生产者%d号退出任务\n", num+1) + return + } + + } + +}