Skip to content

Commit 775b338

Browse files
authored
Merge pull request #2 from impossiblecloud/fix-for-slow-uploads
Add file locking to avoid double-processing
2 parents d9be848 + 032c103 commit 775b338

File tree

4 files changed

+52
-3
lines changed

4 files changed

+52
-3
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
## Description
2-
31
<!--
42
Please provide a meaningful description of what this change will do, or is for. Bonus points for including links to related issues, other PRs, or technical references.
53

internal/fs/fs.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/fsnotify/fsnotify"
1414
)
1515

16+
const lockFilePrefix = "/tmp/s3-file-uploader.lock"
17+
1618
// Check if fs event is the one we care about
1719
func isValidFsEvent(event fsnotify.Event) bool {
1820

@@ -66,7 +68,12 @@ func fsScan(comm *chan cfg.Message, config cfg.AppConfig) {
6668

6769
for _, e := range entries {
6870
//config.Applog.Infof("Found file %q", e.Name())
69-
*comm <- cfg.Message{File: filepath.Join(config.PathToWatch, e.Name())}
71+
filename := filepath.Join(config.PathToWatch, e.Name())
72+
if IsLocked(filename) {
73+
config.Applog.Infof("Found file %q but it's already being processed (lock detected)", filename)
74+
} else {
75+
*comm <- cfg.Message{File: filename}
76+
}
7077
}
7178
}
7279

@@ -190,3 +197,39 @@ func DeleteFile(config cfg.AppConfig, filename string) error {
190197

191198
return nil
192199
}
200+
201+
func getLockFileName(filename string) string {
202+
file := filepath.Base(filename)
203+
return fmt.Sprintf("%s.%s", lockFilePrefix, file)
204+
}
205+
206+
// IsLocked checks if the file is locked
207+
func IsLocked(filename string) bool {
208+
lockFile := getLockFileName(filename)
209+
if _, err := os.Stat(lockFile); err != nil {
210+
return false
211+
}
212+
return true
213+
}
214+
215+
// Lock locks the file for exclusive access for processing
216+
func Lock(filename string, id int) error {
217+
if IsLocked(filename) {
218+
return fmt.Errorf("file %s is already locked", filename)
219+
}
220+
221+
lockFile := getLockFileName(filename)
222+
f, err := os.Create(lockFile)
223+
if err != nil {
224+
return err
225+
}
226+
defer f.Close()
227+
f.WriteString(fmt.Sprintf("%d", id))
228+
return nil
229+
}
230+
231+
// UnLock unlocks the file
232+
func UnLock(filename string) error {
233+
lockFile := getLockFileName(filename)
234+
return os.Remove(lockFile)
235+
}

internal/s3/s3.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package s3
33
import (
44
"fmt"
55
"io"
6+
"math/rand/v2"
67
"os"
78
"path/filepath"
9+
"time"
810

911
"github.com/impossiblecloud/s3-file-uploader/internal/cfg"
1012
"github.com/impossiblecloud/s3-file-uploader/internal/utils"
@@ -74,6 +76,10 @@ func FakeUploadFile(config cfg.AppConfig, filename string) (int64, error) {
7476
return 0, err
7577
}
7678

79+
// Simulate random upload time by sleeping for a random duration
80+
sleepSec := rand.IntN(10) + 5
81+
time.Sleep(time.Duration(sleepSec) * time.Second)
82+
7783
config.Applog.Infof("FAKE UPLOAD TO S3: %q file, size %s", realFile, utils.HumanizeBytes(fi.Size(), false))
7884
return fi.Size(), nil
7985
}

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ func worker(wg *sync.WaitGroup, ctx context.Context, id int, config cfg.AppConfi
247247
}
248248

249249
applog.Infof("Worker %d: processing file %q", id, msg.File)
250+
fs.Lock(msg.File, id)
250251

251252
config.Metrics.FileSendCount.WithLabelValues().Inc()
252253
err := sendFileS3(config, client, msg.File)
@@ -256,6 +257,7 @@ func worker(wg *sync.WaitGroup, ctx context.Context, id int, config cfg.AppConfi
256257
} else {
257258
config.Metrics.FileSendSuccess.WithLabelValues().Inc()
258259
}
260+
fs.UnLock(msg.File)
259261
}
260262
}
261263
}

0 commit comments

Comments
 (0)