Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement opportunistic mirror replicator #2510

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions filestore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type Config struct {
Local LocalConfig
// S3 configures storing files in S3.
S3 S3Config
// Replicator configures opportunistic mirror replication from a source onto a destination.
Replicator *ReplicatorConfig
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the Replicator should be at a higher level in the config hierarchy, as a peer of filestore.Config, and that instead of a ReplicaConfig type, the Source and Destination should each be of type filestore.Config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that implemented originally, and settled on a dedicated config to avoid the opportunity to define recursive replica source/description.

}

type LocalConfig struct {
Expand All @@ -34,6 +36,20 @@ type S3Config struct {
SecretKey string
}

type ReplicatorConfig struct {
Source ReplicaConfig
Destination ReplicaConfig
}

type ReplicaConfig struct {
// Type is the type of file store to use: "", "local", "s3"
Type string
// Local configures storing files in local filesystem.
Local LocalConfig
// S3 configures storing files in S3.
S3 S3Config
}

// MakeFilestore creates a new storage system of the configured type.
func MakeFilestore(cfg Config) (Interface, error) {
switch cfg.Type {
Expand All @@ -45,10 +61,38 @@ func MakeFilestore(cfg Config) (Interface, error) {
WithRegion(cfg.S3.Region),
WithKeys(cfg.S3.AccessKey, cfg.S3.SecretKey),
)
case "replicator":
if cfg.Replicator == nil {
return nil, errors.New("replicator source and destination must be specified")
}
var rep Replicator
var err error
if rep.Source, err = makeLocalOrS3Filestore(cfg.Replicator.Source); err != nil {
return nil, fmt.Errorf("failed to instantiate replicator source: %w", err)
}
if rep.Destination, err = makeLocalOrS3Filestore(cfg.Replicator.Destination); err != nil {
return nil, fmt.Errorf("failed to instantiate replicator destination: %w", err)
}
return &rep, nil
case "":
return nil, errors.New("storage type not defined")
case "none":
return nil, nil
}
return nil, fmt.Errorf("unsupported file storage type: %s", cfg.Type)
}

func makeLocalOrS3Filestore(cfg ReplicaConfig) (Interface, error) {
switch cfg.Type {
case "local":
return NewLocal(cfg.Local.BasePath)
case "s3":
return NewS3(cfg.S3.BucketName,
WithEndpoint(cfg.S3.Endpoint),
WithRegion(cfg.S3.Region),
WithKeys(cfg.S3.AccessKey, cfg.S3.SecretKey),
)
default:
return nil, fmt.Errorf("only local or s3 type is allowed; got: %s", cfg.Type)
}
}
133 changes: 133 additions & 0 deletions filestore/replicator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package filestore

import (
"context"
"errors"
"io"
"io/fs"
)

var _ Interface = (*Replicator)(nil)

// Replicator opportunistically replicates data from Replicator.Source onto Replicator.Destination and
// uses Replicator.Destination whenever possible, falling back on Replicator.Source upon fs.ErrNotExist errors.
type Replicator struct {
Destination Interface
Source Interface
}

func (r *Replicator) Delete(ctx context.Context, path string) error {
return r.Destination.Delete(ctx, path)
}

// Get Attempts to get from Replicator.Destination first, and if path does nto exist falls back onto Replicator.Source.
// If the path is found at source, it is replicated onto destination first and then the replica is returned.
func (r *Replicator) Get(ctx context.Context, path string) (*File, io.ReadCloser, error) {
dFile, dRc, dErr := r.Destination.Get(ctx, path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filestore is only used during ingestion of new index data, as a faster alternative to reading from the publisher. The only case where data would be available in the destination is if a new indexer is re-indexing. In which case, wouldn't it be better to support multiple sources (that are not the destination) instead of trying to read from the destination as one of the sources?

if dErr != nil {
if errors.Is(dErr, fs.ErrNotExist) {
_, sRc, sErr := r.Source.Get(ctx, path)
if sErr != nil {
// Cannot get from source; whatever the error, simply return it since it makes no difference.
return nil, nil, dErr

}

// TODO Consider doing streaming put via Tee once read semantics is understood.
// This would reduce IOPS by coping data while it is being read.
// However, for this to work properly, the "Get"er should read fully and quickly.
// Otherwise, we end up with a lot of half written garbage during streaming Put.
// Also concurrency is a concern since in local storage files are written directly
// at path, which can interfere with Get on the same path while data is being replicated.
// For now, simply Put first then Get to avoid the complexities listed above.

// Replicate the file into destination
if _, err := r.Destination.Put(ctx, path, sRc); err != nil {
return nil, nil, err
}
// Get it from destination
return r.Destination.Get(ctx, path)
}
}
return dFile, dRc, nil
}

func (r *Replicator) Head(ctx context.Context, path string) (*File, error) {
dFile, dErr := r.Destination.Head(ctx, path)
if dErr != nil {
return r.Source.Head(ctx, path)
}
return dFile, nil
}

func (r *Replicator) List(ctx context.Context, path string, recursive bool) (<-chan *File, <-chan error) {
c := make(chan *File, 1)
e := make(chan error, 1)

go func() {
defer close(e)
defer close(c)

// Attempt listing from Destination first.
// If Destination returns a least one *File prior to returning any errors, then
// continue listing from destination may it return further *File or error.
//
// If Destination returns an error first, then fallback on listing from Source.

{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary brace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only there to separate the scope of working with destination vs source.

dC, dE := r.Destination.List(ctx, path, recursive)
var listedAtLeastOneFile bool
DestinationList:
for {
select {
case <-ctx.Done():
return
case dee, ok := <-dE:
if listedAtLeastOneFile {
if ok {
e <- dee
}
return
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary else following return. Same below.

break DestinationList
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break not needed, since that is where loop iteration continues naturally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intend of the break is to not continue the iteration, and fall back on listing from source. No?

}
case dcc, ok := <-dC:
if ok {
c <- dcc
listedAtLeastOneFile = true
} else {
if listedAtLeastOneFile {
return
} else {
break DestinationList
}
}
Comment on lines +95 to +104
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ok {
c <- dcc
listedAtLeastOneFile = true
} else {
if listedAtLeastOneFile {
return
} else {
break DestinationList
}
}
if ok {
c <- dcc
listedAtLeastOneFile = true
} else if listedAtLeastOneFile {
return
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the suggested commit results in listing from destination to continue, which is not what I intend to do. Unless I have missed something?

}
}
}

{
sC, sE := r.Source.List(ctx, path, recursive)
for {
select {
case <-ctx.Done():
return
case see := <-sE:
e <- see
return
case dcc := <-sC:
c <- dcc
}
}
}
}()
return c, e
}

func (r *Replicator) Put(ctx context.Context, path string, reader io.Reader) (*File, error) {
return r.Destination.Put(ctx, path, reader)
}

func (r *Replicator) Type() string {
return "replicator"
}
Loading