Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3406] A Stateless table manager implement #3407

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.DefaultTableManager;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.executor.AsyncTableExecutors;
import org.apache.amoro.server.terminal.TerminalManager;
Expand Down Expand Up @@ -95,7 +97,8 @@ public class AmoroServiceContainer {
private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
private CatalogManager catalogManager;
private DefaultTableService tableService;
private TableManager tableManager;
private TableService tableService;
private DefaultOptimizingService optimizingService;
private TerminalManager terminalManager;
private Configurations serviceConfig;
Expand Down Expand Up @@ -150,8 +153,11 @@ public void startService() throws Exception {
MetricManager.getInstance();

catalogManager = new DefaultCatalogManager(serviceConfig);
tableManager = new DefaultTableManager(serviceConfig, catalogManager);

tableService = new DefaultTableService(serviceConfig, catalogManager);
optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, tableService);
optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, tableManager, tableService);

LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand All @@ -168,7 +174,8 @@ public void startService() throws Exception {
addHandlerChain(AsyncTableExecutors.getInstance().getTagsAutoCreatingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
terminalManager = new TerminalManager(serviceConfig, catalogManager, tableService);
tableManager.setTableService(tableService);
terminalManager = new TerminalManager(serviceConfig, catalogManager);

initThriftService();
startThriftService();
Expand Down Expand Up @@ -245,8 +252,13 @@ private void startThriftServer(TServer server, String threadName) {
private void initHttpService() {
DashboardServer dashboardServer =
new DashboardServer(
serviceConfig, catalogManager, tableService, optimizingService, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableService);
serviceConfig,
catalogManager,
tableManager,
optimizingService,
terminalManager,
tableService);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager);

httpServer =
Javalin.create(
Expand Down Expand Up @@ -338,7 +350,7 @@ private void initThriftService() throws TTransportException {
new AmoroTableMetastore.Processor<>(
ThriftServiceProxy.createProxy(
AmoroTableMetastore.Iface.class,
new TableManagementService(catalogManager, tableService),
new TableManagementService(catalogManager, tableManager),
AmoroRuntimeException::normalizeCompatibly));
tableManagementServer =
createThriftServer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.MaintainedTableManager;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
Expand Down Expand Up @@ -100,20 +100,23 @@ public class DefaultOptimizingService extends StatedPersistentBase
private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper();
private final CatalogManager catalogManager;
private final TableService tableService;
private final MaintainedTableManager tableManager;
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;

public DefaultOptimizingService(
Configurations serviceConfig,
CatalogManager catalogManager,
DefaultTableService tableService) {
MaintainedTableManager tableManager,
TableService tableService) {
this.optimizerTouchTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.tableService = tableService;
this.catalogManager = catalogManager;
this.tableManager = tableManager;
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
Expand All @@ -140,7 +143,7 @@ private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
catalogManager,
group,
this,
planExecutor,
Expand Down Expand Up @@ -317,7 +320,7 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
doAs(ResourceMapper.class, mapper -> mapper.insertResourceGroup(resourceGroup));
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
catalogManager,
resourceGroup,
this,
planExecutor,
Expand Down Expand Up @@ -414,7 +417,7 @@ public boolean canDeleteResourceGroup(String name) {
return false;
}
}
for (ServerTableIdentifier identifier : tableService.listManagedTables()) {
for (ServerTableIdentifier identifier : tableManager.listManagedTables()) {
if (optimizingQueueByGroup.containsKey(name)
&& optimizingQueueByGroup.get(name).containsTable(identifier)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.manager.EventsManager;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.internal.InternalTableCreator;
import org.apache.amoro.server.table.internal.InternalTableHandler;
import org.apache.amoro.server.table.internal.InternalTableManager;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
Expand Down Expand Up @@ -108,11 +108,11 @@ public class RestCatalogService extends PersistentBase {
private final JavalinJackson jsonMapper;

private final CatalogManager catalogManager;
private final TableService tableService;
private final InternalTableManager tableManager;

public RestCatalogService(CatalogManager catalogManager, TableService tableService) {
public RestCatalogService(CatalogManager catalogManager, InternalTableManager tableManager) {
this.catalogManager = catalogManager;
this.tableService = tableService;
this.tableManager = tableManager;
ObjectMapper objectMapper = jsonMapper();
this.jsonMapper = new JavalinJackson(objectMapper);
}
Expand Down Expand Up @@ -286,7 +286,7 @@ public void createTable(Context ctx) {
catalog.newTableCreator(database, tableName, format, request)) {
try {
org.apache.amoro.server.table.TableMetadata metadata = creator.create();
tableService.createTable(catalog.name(), metadata);
tableManager.createTable(catalog.name(), metadata);
} catch (RuntimeException e) {
creator.rollback();
throw e;
Expand Down Expand Up @@ -347,7 +347,8 @@ public void deleteTable(Context ctx) {
Boolean.parseBoolean(
Optional.ofNullable(ctx.req.getParameter("purgeRequested")).orElse("false"));
org.apache.amoro.server.table.TableMetadata tableMetadata = handler.tableMetadata();
tableService.dropTableMetadata(tableMetadata.getTableIdentifier().getIdentifier(), purge);
tableManager.dropTableMetadata(
tableMetadata.getTableIdentifier().getIdentifier().buildTableIdentifier(), purge);
handler.dropTable(purge);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.InternalTableUtil;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;

Expand All @@ -44,11 +44,11 @@
public class TableManagementService implements AmoroTableMetastore.Iface {

private final CatalogManager catalogManager;
private final TableService tableService;
private final TableManager tableManager;

public TableManagementService(CatalogManager catalogManager, TableService tableService) {
public TableManagementService(CatalogManager catalogManager, TableManager tableManager) {
this.catalogManager = catalogManager;
this.tableService = tableService;
this.tableManager = tableManager;
}

@Override
Expand Down Expand Up @@ -89,11 +89,14 @@ public void createTableMeta(TableMeta tableMeta) {
}
ServerTableIdentifier identifier =
ServerTableIdentifier.of(
tableMeta.getTableIdentifier(), TableFormat.valueOf(tableMeta.getFormat()));
tableMeta.getTableIdentifier().getCatalog(),
tableMeta.getTableIdentifier().getDatabase(),
tableMeta.getTableIdentifier().getTableName(),
TableFormat.valueOf(tableMeta.getFormat()));
InternalCatalog catalog = catalogManager.getInternalCatalog(identifier.getCatalog());
CatalogMeta catalogMeta = catalog.getMetadata();
TableMetadata tableMetadata = new TableMetadata(identifier, tableMeta, catalogMeta);
tableService.createTable(catalog.name(), tableMetadata);
tableManager.createTable(catalog.name(), tableMetadata);
}

@Override
Expand Down Expand Up @@ -124,7 +127,7 @@ public TableMeta getTable(TableIdentifier tableIdentifier) {

@Override
public void removeTable(TableIdentifier tableIdentifier, boolean deleteData) {
tableService.dropTableMetadata(tableIdentifier, deleteData);
tableManager.dropTableMetadata(tableIdentifier, deleteData);
}

@Override
Expand All @@ -142,22 +145,22 @@ public Blocker block(
List<BlockableOperation> operations,
Map<String, String> properties)
throws OperationConflictException {
return tableService.block(tableIdentifier, operations, properties);
return tableManager.block(tableIdentifier, operations, properties);
}

@Override
public void releaseBlocker(TableIdentifier tableIdentifier, String blockerId) {
tableService.releaseBlocker(tableIdentifier, blockerId);
tableManager.releaseBlocker(tableIdentifier, blockerId);
}

@Override
public long renewBlocker(TableIdentifier tableIdentifier, String blockerId)
throws NoSuchObjectException {
return tableService.renewBlocker(tableIdentifier, blockerId);
return tableManager.renewBlocker(tableIdentifier, blockerId);
}

@Override
public List<Blocker> getBlockers(TableIdentifier tableIdentifier) {
return tableService.getBlockers(tableIdentifier);
return tableManager.getBlockers(tableIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.amoro.server.catalog;

import org.apache.amoro.AmoroTable;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.table.TableIdentifier;

import java.util.List;

Expand Down Expand Up @@ -90,4 +92,12 @@ public interface CatalogManager {
* @param catalogMeta The CatalogMeta object representing the updated catalog information.
*/
void updateCatalog(CatalogMeta catalogMeta);

/**
* load a table via server catalog.
*
* @param identifier managed table identifier
* @return managed table.
*/
AmoroTable<?> loadTable(TableIdentifier identifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.amoro.server.catalog;

import org.apache.amoro.AmoroTable;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.exception.AlreadyExistsException;
Expand All @@ -31,6 +32,7 @@
import org.apache.amoro.shade.guava32.com.google.common.cache.CacheLoader;
import org.apache.amoro.shade.guava32.com.google.common.cache.LoadingCache;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.TableIdentifier;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -195,6 +197,12 @@ public void updateCatalog(CatalogMeta catalogMeta) {
LOG.info("Update catalog metadata: {}", catalogMeta.getCatalogName());
}

@Override
public AmoroTable<?> loadTable(TableIdentifier identifier) {
ServerCatalog serverCatalog = getServerCatalog(identifier.getCatalog());
return serverCatalog.loadTable(identifier.getDatabase(), identifier.getTableName());
}

private void validateCatalogUpdate(CatalogMeta oldMeta, CatalogMeta newMeta) {
if (!oldMeta.getCatalogType().equals(newMeta.getCatalogType())) {
throw new IllegalMetadataException("Cannot update catalog type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableIDWithFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.exception.AlreadyExistsException;
import org.apache.amoro.exception.IllegalMetadataException;
import org.apache.amoro.exception.ObjectNotExistsException;
Expand All @@ -32,6 +31,7 @@
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.internal.InternalTableCreator;
import org.apache.amoro.server.table.internal.InternalTableHandler;
import org.apache.amoro.table.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;

import java.util.List;
Expand Down Expand Up @@ -101,11 +101,7 @@ public List<TableIDWithFormat> listTables(String database) {
TableMetaMapper.class,
mapper -> mapper.selectTableIdentifiersByDb(getMetadata().getCatalogName(), database))
.stream()
.map(
sid ->
TableIDWithFormat.of(
org.apache.amoro.table.TableIdentifier.of(sid.getIdentifier()),
sid.getFormat()))
.map(sid -> TableIDWithFormat.of(sid.getIdentifier(), sid.getFormat()))
.collect(Collectors.toList());
}

Expand All @@ -115,11 +111,7 @@ public List<TableIDWithFormat> listTables() {
TableMetaMapper.class,
mapper -> mapper.selectTableIdentifiersByCatalog(getMetadata().getCatalogName()))
.stream()
.map(
sid ->
TableIDWithFormat.of(
org.apache.amoro.table.TableIdentifier.of(sid.getIdentifier()),
sid.getFormat()))
.map(sid -> TableIDWithFormat.of(sid.getIdentifier(), sid.getFormat()))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.amoro.server.dashboard.controller.VersionController;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.ParamSignatureCalculator;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -98,21 +99,26 @@ public class DashboardServer {
public DashboardServer(
Configurations serviceConfig,
CatalogManager catalogManager,
TableService tableService,
TableManager tableManager,
DefaultOptimizingService optimizerManager,
TerminalManager terminalManager) {
TerminalManager terminalManager,
TableService tableService) {
PlatformFileManager platformFileManager = new PlatformFileManager();
this.catalogController = new CatalogController(catalogManager, platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
this.optimizerGroupController = new OptimizerGroupController(tableService, optimizerManager);
// TODO: remove table service from OptimizerGroupController
this.optimizerGroupController =
new OptimizerGroupController(tableManager, tableService, optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.platformFileInfoController = new PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor =
new ServerTableDescriptor(catalogManager, tableService, serviceConfig);
new ServerTableDescriptor(catalogManager, tableManager, serviceConfig);
// TODO: remove table service from TableController
this.tableController =
new TableController(catalogManager, tableService, tableDescriptor, serviceConfig);
new TableController(
catalogManager, tableManager, tableService, tableDescriptor, serviceConfig);
this.terminalController = new TerminalController(terminalManager);
this.versionController = new VersionController();
this.overviewController = new OverviewController();
Expand Down
Loading
Loading