From 73f0802ef7f44138cecf26ae214b3ebe7b7f56e6 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Mon, 12 Feb 2024 17:15:38 +0000 Subject: [PATCH] Implement opportunistic mirror replicator Implement a file store that opportunistically replicates data. --- filestore/config.go | 44 +++++++++++++ filestore/replicator.go | 133 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) create mode 100644 filestore/replicator.go diff --git a/filestore/config.go b/filestore/config.go index e223983e1..7cc2163e3 100644 --- a/filestore/config.go +++ b/filestore/config.go @@ -13,6 +13,8 @@ type Config struct { Local LocalConfig // S3 configures storing files in S3. S3 S3Config + // Replicator configures opportunistic mirror replication from a source onto a destination. + Replicator *ReplicatorConfig } type LocalConfig struct { @@ -34,6 +36,20 @@ type S3Config struct { SecretKey string } +type ReplicatorConfig struct { + Source ReplicaConfig + Destination ReplicaConfig +} + +type ReplicaConfig struct { + // Type is the type of file store to use: "", "local", "s3" + Type string + // Local configures storing files in local filesystem. + Local LocalConfig + // S3 configures storing files in S3. + S3 S3Config +} + // MakeFilestore creates a new storage system of the configured type. func MakeFilestore(cfg Config) (Interface, error) { switch cfg.Type { @@ -45,6 +61,19 @@ func MakeFilestore(cfg Config) (Interface, error) { WithRegion(cfg.S3.Region), WithKeys(cfg.S3.AccessKey, cfg.S3.SecretKey), ) + case "replicator": + if cfg.Replicator == nil { + return nil, errors.New("replicator source and destination must be specified") + } + var rep Replicator + var err error + if rep.Source, err = makeLocalOrS3Filestore(cfg.Replicator.Source); err != nil { + return nil, fmt.Errorf("failed to instantiate replicator source: %w", err) + } + if rep.Destination, err = makeLocalOrS3Filestore(cfg.Replicator.Destination); err != nil { + return nil, fmt.Errorf("failed to instantiate replicator destination: %w", err) + } + return &rep, nil case "": return nil, errors.New("storage type not defined") case "none": @@ -52,3 +81,18 @@ func MakeFilestore(cfg Config) (Interface, error) { } return nil, fmt.Errorf("unsupported file storage type: %s", cfg.Type) } + +func makeLocalOrS3Filestore(cfg ReplicaConfig) (Interface, error) { + switch cfg.Type { + case "local": + return NewLocal(cfg.Local.BasePath) + case "s3": + return NewS3(cfg.S3.BucketName, + WithEndpoint(cfg.S3.Endpoint), + WithRegion(cfg.S3.Region), + WithKeys(cfg.S3.AccessKey, cfg.S3.SecretKey), + ) + default: + return nil, fmt.Errorf("only local or s3 type is allowed; got: %s", cfg.Type) + } +} diff --git a/filestore/replicator.go b/filestore/replicator.go new file mode 100644 index 000000000..167b5477e --- /dev/null +++ b/filestore/replicator.go @@ -0,0 +1,133 @@ +package filestore + +import ( + "context" + "errors" + "io" + "io/fs" +) + +var _ Interface = (*Replicator)(nil) + +// Replicator opportunistically replicates data from Replicator.Source onto Replicator.Destination and +// uses Replicator.Destination whenever possible, falling back on Replicator.Source upon fs.ErrNotExist errors. +type Replicator struct { + Destination Interface + Source Interface +} + +func (r *Replicator) Delete(ctx context.Context, path string) error { + return r.Destination.Delete(ctx, path) +} + +// Get Attempts to get from Replicator.Destination first, and if path does nto exist falls back onto Replicator.Source. +// If the path is found at source, it is replicated onto destination first and then the replica is returned. +func (r *Replicator) Get(ctx context.Context, path string) (*File, io.ReadCloser, error) { + dFile, dRc, dErr := r.Destination.Get(ctx, path) + if dErr != nil { + if errors.Is(dErr, fs.ErrNotExist) { + _, sRc, sErr := r.Source.Get(ctx, path) + if sErr != nil { + // Cannot get from source; whatever the error, simply return it since it makes no difference. + return nil, nil, dErr + + } + + // TODO Consider doing streaming put via Tee once read semantics is understood. + // This would reduce IOPS by coping data while it is being read. + // However, for this to work properly, the "Get"er should read fully and quickly. + // Otherwise, we end up with a lot of half written garbage during streaming Put. + // Also concurrency is a concern since in local storage files are written directly + // at path, which can interfere with Get on the same path while data is being replicated. + // For now, simply Put first then Get to avoid the complexities listed above. + + // Replicate the file into destination + if _, err := r.Destination.Put(ctx, path, sRc); err != nil { + return nil, nil, err + } + // Get it from destination + return r.Destination.Get(ctx, path) + } + } + return dFile, dRc, nil +} + +func (r *Replicator) Head(ctx context.Context, path string) (*File, error) { + dFile, dErr := r.Destination.Head(ctx, path) + if dErr != nil { + return r.Source.Head(ctx, path) + } + return dFile, nil +} + +func (r *Replicator) List(ctx context.Context, path string, recursive bool) (<-chan *File, <-chan error) { + c := make(chan *File, 1) + e := make(chan error, 1) + + go func() { + defer close(e) + defer close(c) + + // Attempt listing from Destination first. + // If Destination returns a least one *File prior to returning any errors, then + // continue listing from destination may it return further *File or error. + // + // If Destination returns an error first, then fallback on listing from Source. + + { + dC, dE := r.Destination.List(ctx, path, recursive) + var listedAtLeastOneFile bool + DestinationList: + for { + select { + case <-ctx.Done(): + return + case dee, ok := <-dE: + if listedAtLeastOneFile { + if ok { + e <- dee + } + return + } else { + break DestinationList + } + case dcc, ok := <-dC: + if ok { + c <- dcc + listedAtLeastOneFile = true + } else { + if listedAtLeastOneFile { + return + } else { + break DestinationList + } + } + } + } + } + + { + sC, sE := r.Source.List(ctx, path, recursive) + for { + select { + case <-ctx.Done(): + return + case see := <-sE: + e <- see + return + case dcc := <-sC: + c <- dcc + } + } + } + }() + return c, e +} + +func (r *Replicator) Put(ctx context.Context, path string, reader io.Reader) (*File, error) { + return r.Destination.Put(ctx, path, reader) +} + +func (r *Replicator) Type() string { + return "replicator" +}