Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor storage factories to hold one configuration #6156

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (s *storageExt) Start(_ context.Context, host component.Host) error {
case cfg.Cassandra != nil:
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, mf, s.telset.Logger)
case cfg.Elasticsearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, es.PrimaryNamespace, mf, s.telset.Logger)
case cfg.Opensearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, es.PrimaryNamespace, mf, s.telset.Logger)
}
if err != nil {
return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type Configuration struct {
Tags TagsAsFields `mapstructure:"tags_as_fields"`
// Enabled, if set to true, enables the namespace for storage pointed to by this configuration.
Enabled bool `mapstructure:"-"`
// TODO: revisit if this needed
IsArchive bool
}

// TagsAsFields holds configuration for tag schema.
Expand Down
119 changes: 31 additions & 88 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,15 @@ import (
)

const (
primaryNamespace = "es"
archiveNamespace = "es-archive"
PrimaryNamespace = "es"
ArchiveNamespace = "es-archive"
)

var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
Copy link
Member

@yurishkuro yurishkuro Nov 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to be very careful about removing ArchiveFactory interface, because query service uses it via runtime cast, so unless we have integration tests (many of them disable archive tests as I recall) you can introduce a breaking change

archiveFactory, ok := storageFactory.(storage.ArchiveFactory)

Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Nov 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro yep - noted. I was thinking to avoid breaking changes we can remove the ArchiveFactory within this PR and use an archive storage factory wherever needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
_ storage.Factory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
)

// Factory implements storage.Factory for Elasticsearch backend.
Expand All @@ -58,26 +57,25 @@ type Factory struct {

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

primaryConfig *config.Configuration
archiveConfig *config.Configuration
config *config.Configuration

primaryClient atomic.Pointer[es.Client]
archiveClient atomic.Pointer[es.Client]
client atomic.Pointer[es.Client]

watchers []*fswatcher.FSWatcher
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
func NewFactory(namespace string) *Factory {
return &Factory{
Options: NewOptions(primaryNamespace, archiveNamespace),
Options: NewOptions(namespace),
newClientFn: config.NewClient,
tracer: otel.GetTracerProvider(),
}
}

func NewFactoryWithConfig(
cfg config.Configuration,
namespace string,
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
Expand All @@ -88,19 +86,12 @@ func NewFactoryWithConfig(
defaultConfig := DefaultConfig()
cfg.ApplyDefaults(&defaultConfig)

archive := make(map[string]*namespaceConfig)
archive[archiveNamespace] = &namespaceConfig{
Configuration: cfg,
namespace: archiveNamespace,
}

f := NewFactory()
f := NewFactory(namespace)
f.configureFromOptions(&Options{
Primary: namespaceConfig{
Configuration: cfg,
namespace: primaryNamespace,
namespace: namespace,
},
others: archive,
})
err := f.Initialize(metricsFactory, logger)
if err != nil {
Expand All @@ -123,96 +114,55 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) {
// configureFromOptions configures factory from Options struct.
func (f *Factory) configureFromOptions(o *Options) {
f.Options = o
f.primaryConfig = f.Options.GetPrimary()
f.archiveConfig = f.Options.Get(archiveNamespace)
f.config = f.Options.GetPrimary()
}

// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
primaryClient, err := f.newClientFn(f.config, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
}
f.primaryClient.Store(&primaryClient)
f.client.Store(&primaryClient)

if f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath != "" {
primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
if f.config.Authentication.BasicAuthentication.PasswordFilePath != "" {
primaryWatcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err)
}
f.watchers = append(f.watchers, primaryWatcher)
}

if f.archiveConfig.Enabled {
archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
}
f.archiveClient.Store(&archiveClient)

if f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath != "" {
archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onArchivePasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err)
}
f.watchers = append(f.watchers, archiveWatcher)
}
}

return nil
}

func (f *Factory) getPrimaryClient() es.Client {
if c := f.primaryClient.Load(); c != nil {
return *c
}
return nil
}

func (f *Factory) getArchiveClient() es.Client {
if c := f.archiveClient.Load(); c != nil {
if c := f.client.Load(); c != nil {
return *c
}
return nil
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
return createSpanReader(f.getPrimaryClient, f.config, f.metricsFactory, f.logger, f.tracer)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.config, false, f.metricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.getPrimaryClient, f.primaryConfig, f.logger)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
return createDependencyReader(f.getPrimaryClient, f.config, f.logger)
}

func createSpanReader(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
tp trace.TracerProvider,
Expand All @@ -229,7 +179,7 @@ func createSpanReader(
ServiceIndex: cfg.Indices.Services,
TagDotReplacement: cfg.Tags.DotReplacement,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
Archive: cfg.IsArchive,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
MetricsFactory: mFactory,
Expand Down Expand Up @@ -287,16 +237,16 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store
params := esSampleStore.Params{
Client: f.getPrimaryClient,
Logger: f.logger,
IndexPrefix: f.primaryConfig.Indices.IndexPrefix,
IndexDateLayout: f.primaryConfig.Indices.Sampling.DateLayout,
IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.primaryConfig.Indices.Sampling.RolloverFrequency),
Lookback: f.primaryConfig.AdaptiveSamplingLookback,
MaxDocCount: f.primaryConfig.MaxDocCount,
IndexPrefix: f.config.Indices.IndexPrefix,
IndexDateLayout: f.config.Indices.Sampling.DateLayout,
IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.config.Indices.Sampling.RolloverFrequency),
Lookback: f.config.AdaptiveSamplingLookback,
MaxDocCount: f.config.MaxDocCount,
}
store := esSampleStore.NewSamplingStore(params)

if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM {
mappingBuilder := mappingBuilderFromConfig(f.primaryConfig)
if f.config.CreateIndexTemplates && !f.config.UseILM {
mappingBuilder := mappingBuilderFromConfig(f.config)
samplingMapping, err := mappingBuilder.GetSamplingMappings()
if err != nil {
return nil, err
Expand Down Expand Up @@ -344,19 +294,12 @@ func (f *Factory) Close() error {
errs = append(errs, w.Close())
}
errs = append(errs, f.getPrimaryClient().Close())
if client := f.getArchiveClient(); client != nil {
errs = append(errs, client.Close())
}

return errors.Join(errs...)
}

func (f *Factory) onPrimaryPasswordChange() {
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient)
}

func (f *Factory) onArchivePasswordChange() {
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient)
f.onClientPasswordChange(f.config, &f.client)
}

func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) {
Expand Down
38 changes: 3 additions & 35 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ var defaultIndexOptions = config.IndexOptions{
// (e.g. archive) may be underspecified and infer the rest of its parameters from primary.
type Options struct {
Primary namespaceConfig `mapstructure:",squash"`

others map[string]*namespaceConfig
}

type namespaceConfig struct {
Expand All @@ -95,24 +93,14 @@ type namespaceConfig struct {
}

// NewOptions creates a new Options struct.
func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
func NewOptions(namespace string) *Options {
// TODO all default values should be defined via cobra flags
defaultConfig := DefaultConfig()
options := &Options{
Primary: namespaceConfig{
Configuration: defaultConfig,
namespace: primaryNamespace,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
}

// Other namespaces need to be explicitly enabled.
defaultConfig.Enabled = false
for _, namespace := range otherNamespaces {
options.others[namespace] = &namespaceConfig{
Configuration: defaultConfig,
namespace: namespace,
}
},
}

return options
Expand All @@ -127,9 +115,6 @@ func (cfg *namespaceConfig) getTLSFlagsConfig() tlscfg.ClientFlagsConfig {
// AddFlags adds flags for Options
func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
addFlags(flagSet, &opt.Primary)
for _, cfg := range opt.others {
addFlags(flagSet, cfg)
}
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
Expand Down Expand Up @@ -286,7 +271,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixAdaptiveSamplingLookback,
nsConfig.AdaptiveSamplingLookback,
"How far back to look for the latest adaptive sampling probabilities")
if nsConfig.namespace == archiveNamespace {
if nsConfig.namespace == ArchiveNamespace {
flagSet.Bool(
nsConfig.namespace+suffixEnabled,
nsConfig.Enabled,
Expand All @@ -305,9 +290,6 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
initFromViper(&opt.Primary, v)
for _, cfg := range opt.others {
initFromViper(cfg, v)
}
}

func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
Expand Down Expand Up @@ -392,20 +374,6 @@ func (opt *Options) GetPrimary() *config.Configuration {
return &opt.Primary.Configuration
}

// Get returns auxiliary named configuration.
func (opt *Options) Get(namespace string) *config.Configuration {
nsCfg, ok := opt.others[namespace]
if !ok {
nsCfg = &namespaceConfig{}
opt.others[namespace] = nsCfg
}
nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration)
if len(nsCfg.Configuration.Servers) == 0 {
nsCfg.Servers = opt.Primary.Servers
}
return &nsCfg.Configuration
}

// stripWhiteSpace removes all whitespace characters from a string
func stripWhiteSpace(str string) string {
return strings.ReplaceAll(str, " ", "")
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) {
case cassandraStorageType:
return cassandra.NewFactory(), nil
case elasticsearchStorageType, opensearchStorageType:
return es.NewFactory(), nil
return es.NewFactory(es.PrimaryNamespace), nil
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro can you take another look at the changes? I minimized the code movements to simply remove the others field from Options. The problem now is that this isn't initializing the CLI flags for es-archive. Any thoughts on how to get around this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #6156 (comment)

The query service instantiates just a single factory and casts it to ArchiveFactory. With your changes (which are in the right direction, but of insufficient scope) it never gets a chance to create archive CLI flags, because that has to happen via different factories.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro is the archive factory only needed for the query service?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

case memoryStorageType:
return memory.NewFactory(), nil
case kafkaStorageType:
Expand Down
Loading