Ontwikkel een concurrentievermogen Go-app met concurrency patterns
description
Met deze prompt leer je hoe je effectief concurrency in Go toepast, waardoor je applicaties sneller en stabieler worden. Het helpt race conditions voorkomen en verbetert het beheer van systeemresources, wat essentieel is voor schaalbare softwareontwikkeling.
prompt
Help me een Go-applicatie te ontwerpen die gebruikmaakt van concurrency patterns om prestaties te optimaliseren. Beschrijf hoe ik goroutines en channe ...
try_prompt
generate_helper
disclaimerOnPageApi
tags
ai_answers
provider: openai
model: gpt-4.1-nano
Natuurlijk! Hier is een ontwerp voor een Go-applicatie die gebruikmaakt van concurrency patterns, zoals goroutines en channels, voor batchverwerking van bestanden met meerdere gelijktijdige uploads. Het richt zich op het efficiënt en veilig afhandelen van gelijktijdige taken, het voorkomen van race conditions en het beheren van resources.
### Hoofdconcepten
- **Goroutines**: voor gelijktijdige uploads
- **Channels**: voor communicatie en synchronisatie
- **Worker pool patroon**: voor limiet op gelijktijdige uploads
- **WaitGroup**: voor wachten op alle goroutines
- **Context**: voor afbreken en tijdslimieten
---
### Stap 1: Structuur van de applicatie
- Een lijst van bestanden die geüpload moeten worden.
- Een worker pool die een beperkt aantal gelijktijdige uploads toelaat.
- Een manier om resultaten en fouten te verzamelen.
---
### Stap 2: voorbeeldcode
```go
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Simuleert het uploaden van een bestand
func uploadFile(ctx context.Context, filename string) error {
// Simuleer verwerkingstijd
select {
case <-time.After(2 * time.Second):
fmt.Printf("Bestand %s geüpload\n", filename)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func worker(ctx context.Context, jobs <-chan string, results chan<- string, errors chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
for filename := range jobs {
if err := uploadFile(ctx, filename); err != nil {
errors <- fmt.Errorf("fout bij upload %s: %w", filename, err)
continue
}
results <- filename
}
}
func main() {
// Context met timeout voor afbreken
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bestanden := []string{"file1.txt", "file2.txt", "file3.txt", "file4.txt", "file5.txt"}
// Channels voor taken en resultaten
jobs := make(chan string)
results := make(chan string)
errorsCh := make(chan error)
var wg sync.WaitGroup
aantalWorkers := 3 // Limiteer gelijktijdige uploads
// Start workers
for i := 0; i < aantalWorkers; i++ {
wg.Add(1)
go worker(ctx, jobs, results, errorsCh, &wg)
}
// Verzamel resultaten en fouten
var uploadWacht sync.WaitGroup
uploadWacht.Add(1)
go func() {
defer uploadWacht.Done()
for {
select {
case res, ok := <-results:
if !ok {
return
}
fmt.Printf("Resultaat: %s succesvol geüpload\n", res)
case err, ok := <-errorsCh:
if !ok {
return
}
fmt.Printf("Fout: %v\n", err)
case <-ctx.Done():
return
}
}
}()
// Verzenden van bestanden naar de jobs channel
go func() {
for _, bestand := range bestanden {
select {
case <-ctx.Done():
close(jobs)
return
case jobs <- bestand:
}
}
close(jobs)
}()
// Wacht tot alle workers klaar zijn
wg.Wait()
close(results)
close(errorsCh)
// Wacht tot de resultaten-verzameling klaar is
uploadWacht.Wait()
fmt.Println("Batch upload voltooid.")
}
```
---
### Uitleg van de code:
- **Context**: Gebruikt om de hele operatie te kunnen afbreken bij bijvoorbeeld timeouts of cancelatie.
- **Workers**: Goroutines die bestanden uploaden, gelimiteerd tot `aantalWorkers`.
- **Channels**:
- `jobs`: voor het doorgeven van bestanden aan workers.
- `results`: voor succesvolle uploads.
- `errorsCh`: voor fouten tijdens upload.
- **Race conditions voorkomen**:
- De kanalen worden correct gesloten nadat alle goroutines klaar zijn.
- `sync.WaitGroup` zorgt dat we wachten tot alle goroutines klaar zijn voordat we afsluiten.
- **Resourcebeheer**:
- `defer cancel()` zorgt dat de context netjes wordt opgeruimd.
- Kanalen worden gesloten zodra niet meer gebruikt.
- Goroutines worden correct beëindigd.
---
### Samenvatting
Met dit patroon maak je gebruik van idiomatische Go-concurrentiestructuren om efficiënt en veilig batchgewijs bestanden te uploaden met gelijktijdigheid en resourcecontrole. Door gebruik te maken van een worker pool, channels en WaitGroups voorkom je race conditions en zorg je voor een gecontroleerde afhandeling van taken.
Heeft u nog verdere vragen of wensen voor meer gedetailleerde functionaliteit?
provider: deepseek
model: deepseek-chat
Ik help je graag een Go-applicatie te ontwerpen voor batchverwerking van bestanden met concurrency. Hier is een complete implementatie:
## Applicatie-architectuur
```go
package main
import (
"context"
"fmt"
"io"
"log"
"mime/multipart"
"os"
"path/filepath"
"sync"
"time"
)
// FileProcessor verwerkt bestanden met concurrency
type FileProcessor struct {
maxWorkers int
batchSize int
uploadDir string
wg sync.WaitGroup
fileQueue chan *FileTask
results chan *ProcessingResult
errorHandler func(error)
}
type FileTask struct {
FileHeader *multipart.FileHeader
Content []byte
ID string
}
type ProcessingResult struct {
TaskID string
FilePath string
Size int64
Duration time.Duration
Error error
}
// NewFileProcessor creëert een nieuwe processor
func NewFileProcessor(maxWorkers, batchSize int, uploadDir string) *FileProcessor {
return &FileProcessor{
maxWorkers: maxWorkers,
batchSize: batchSize,
uploadDir: uploadDir,
fileQueue: make(chan *FileTask, batchSize*2),
results: make(chan *ProcessingResult, batchSize*2),
errorHandler: func(err error) { log.Printf("Verwerkingsfout: %v", err) },
}
}
```
## Worker Pool Implementatie
```go
// StartWorkers initialiseert de worker pool
func (fp *FileProcessor) StartWorkers(ctx context.Context) {
for i := 0; i < fp.maxWorkers; i++ {
fp.wg.Add(1)
go fp.worker(ctx, i)
}
}
// worker verwerkt bestanden uit de queue
func (fp *FileProcessor) worker(ctx context.Context, id int) {
defer fp.wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-fp.fileQueue:
if !ok {
return
}
fp.processFile(task)
}
}
}
// processFile verwerkt een individueel bestand
func (fp *FileProcessor) processFile(task *FileTask) {
start := time.Now()
result := &ProcessingResult{TaskID: task.ID}
defer func() {
result.Duration = time.Since(start)
fp.results <- result
}()
// Bestandsnaam genereren met timestamp om conflicten te voorkomen
filename := fmt.Sprintf("%s_%d%s",
task.ID,
time.Now().UnixNano(),
filepath.Ext(task.FileHeader.Filename),
)
filePath := filepath.Join(fp.uploadDir, filename)
// Bestand opslaan
if err := os.WriteFile(filePath, task.Content, 0644); err != nil {
result.Error = err
fp.errorHandler(err)
return
}
result.FilePath = filePath
result.Size = int64(len(task.Content))
}
```
## Batch Verwerking met Rate Limiting
```go
// ProcessBatch verwerkt een batch bestanden met rate limiting
func (fp *FileProcessor) ProcessBatch(ctx context.Context, files []*multipart.FileHeader) ([]*ProcessingResult, error) {
var mu sync.Mutex
results := make([]*ProcessingResult, 0, len(files))
// Semaphore voor rate limiting
sem := make(chan struct{}, fp.maxWorkers)
var batchWg sync.WaitGroup
for i, fileHeader := range files {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
batchWg.Add(1)
// Rate limiting met semaphore
sem <- struct{}{}
go func(idx int, fh *multipart.FileHeader) {
defer batchWg.Done()
defer func() { <-sem }()
content, err := readFileContent(fh)
if err != nil {
fp.errorHandler(err)
return
}
task := &FileTask{
FileHeader: fh,
Content: content,
ID: fmt.Sprintf("file_%d", idx),
}
// Stuur taak naar verwerkingsqueue
select {
case fp.fileQueue <- task:
case <-ctx.Done():
}
}(i, fileHeader)
}
}
// Wacht op batch voltooiing
batchWg.Wait()
// Verzamel resultaten
close(fp.results)
for result := range fp.results {
mu.Lock()
results = append(results, result)
mu.Unlock()
}
return results, nil
}
func readFileContent(fileHeader *multipart.FileHeader) ([]byte, error) {
file, err := fileHeader.Open()
if err != nil {
return nil, err
}
defer file.Close()
return io.ReadAll(file)
}
```
## Main Applicatie met Context Management
```go
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Initialiseer processor
processor := NewFileProcessor(
4, // max workers
10, // batch size
"./uploads",
)
// Start worker pool
processor.StartWorkers(ctx)
// Simuleer bestandsuploads (vervang met echte HTTP handler)
fakeFiles := createTestFiles()
// Verwerk batch
results, err := processor.ProcessBatch(ctx, fakeFiles)
if err != nil {
log.Fatalf("Batch verwerking mislukt: %v", err)
}
// Toon resultaten
successCount := 0
for _, result := range results {
if result.Error == nil {
successCount++
log.Printf("Verwerkt: %s (%.2f MB in %v)",
result.FilePath,
float64(result.Size)/(1024*1024),
result.Duration,
)
}
}
log.Printf("Batch voltooid: %d/%d successen", successCount, len(fakeFiles))
}
// Hulpfunctie voor testbestanden
func createTestFiles() []*multipart.FileHeader {
// Implementatie voor testdoeleinden
return []*multipart.FileHeader{}
}
```
## Veiligheidsmaatregelen en Best Practices
```go
// Thread-safe teller voor monitoring
type SafeCounter struct {
mu sync.RWMutex
count int
}
func (sc *SafeCounter) Increment() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.count++
}
func (sc *SafeCounter) Value() int {
sc.mu.RLock()
defer sc.mu.RUnlock()
return sc.count
}
// Resource cleanup met defer
func (fp *FileProcessor) Close() {
close(fp.fileQueue)
fp.wg.Wait()
close(fp.results)
}
// Error handling wrapper
func withRecovery(fn func()) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic hersteld: %v", r)
}
}()
fn()
}
```
## Gebruik in HTTP Handler
```go
func uploadHandler(w http.ResponseWriter, r *http.Request) {
if err := r.ParseMultipartForm(32 << 20); err != nil {
http.Error(w, "Ongeldige request", http.StatusBadRequest)
return
}
files := r.MultipartForm.File["files"]
if len(files) == 0 {
http.Error(w, "Geen bestanden ontvangen", http.StatusBadRequest)
return
}
processor := NewFileProcessor(4, len(files), "./uploads")
ctx := r.Context()
processor.StartWorkers(ctx)
defer processor.Close()
results, err := processor.ProcessBatch(ctx, files)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Response genereren
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(results)
}
```
## Belangrijke kenmerken:
1. **Worker Pool Pattern**: Beperkt gelijktijdigheid om resource exhaustion te voorkomen
2. **Channel-based Communication**: Veilige data-uitwisseling tussen goroutines
3. **Context Usage**: Proper cancellation en timeout handling
4. **Mutex Protection**: Thread-safe toegang tot gedeelde resources
5. **Error Handling**: Robuuste foutafhandeling met recovery
6. **Resource Management**: Proper cleanup met defer statements
7. **Rate Limiting**: Semaphore pattern voor controlled concurrency
Deze implementatie volgt Go's idiomatische patterns en voorkomt race conditions door careful synchronisatie en channel-based communicatie.