Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ java -jar /home/java-iceberg-cli/target/<jar_name> --help
### CLI
Pass in the URI to the Hive Metastore using -u or --uri options and optionally specify a storage path when creating a new namespace using -w or --warehouse options. By default, the default FS value specified in Hive's configuration file will be used as the warehouse path.

Set credentials for the object store using environment variables. Please note that you would need to specify AWS_ENDPOINT if using a non-AWS object store.
Set credentials for the object store using environment variables. Please note that you would need to specify ENDPOINT if using a non-AWS object store.
```
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
export AWS_REGION=
# specify endpoint to a non-AWS object store, if applicable
export AWS_ENDPOINT=
export ENDPOINT=
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change it to ENDPOINT_URL ?

```

Credentials can also be passed to the CLI as:
Expand All @@ -86,7 +86,7 @@ export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
export AWS_REGION=
# specify endpoint to a non-AWS object store, if applicable
export AWS_ENDPOINT=
export ENDPOINT=
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

export URI=
export WAREHOUSE=
```
Expand All @@ -105,7 +105,7 @@ usage: java -jar <jar_name> [options] command [args]
-c,--credential <credentials> Supported credentials : AWS
--catalog <value> Read properties for this catalog from
the config file
--format <iceberg|hive> The format of the table we want to
-m,--format <iceberg|hive> The format of the table we want to
display
-h,--help Show this help message
-o,--output <console|csv|json> Show output in this format
Expand All @@ -117,12 +117,13 @@ Commands:
drop Drop a table or a namespace
schema Fetch schema of a table
metadata Get table metadata
recordcount Get total number of records in a table
read Read from a table
commit Commit file(s) to a table
rewrite Rewrite file(s) in a table
list List tables or namespaces
type Fetch table type
uuid Fetch uuid of a table
rewrite Rewrite (replace) file(s) in a table
spec Fetch partition spec of a table
rename Rename a table a table
create Create a table or a namespace
Expand All @@ -132,6 +133,7 @@ Commands:
write Write to a table
snapshot Fetch latest or all snapshot(s) of a table
tasks List scan tasks of a table
alter Alter a table
```
Each subcommand provides a help message of its own.
```
Expand Down
7 changes: 7 additions & 0 deletions docs/sample_cli_commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,18 @@ table {
<uuid>
```

* Get record count of a table. Table *test_table* in namespace *test* in this example.
```
% java -jar <jar> -u <uri> recordcount test.test_table
<uuid>
```

### Details

* Get details of a table. Table *test_table* in namespace *test* in this example.
```
% java -jar <jar> -u <uri> describe test.test_table
TOTAL ESTIMATED RECORDS : 1
TOTAL TASKS: 1
TOTAL FILES IN TASK 0 : 1
DATA <location> PARQUET 0 2000 [] true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ else if (namespace != null)
case "metadata":
output = printUtils.printTableMetadata();
break;
case "recordcount":
output = printUtils.printRecordCount();
break;
case "tasks":
output = printUtils.printTasks();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ private void initCatalog(CustomCatalog catalog) throws IOException {
m_catalog.initialize("hive", properties);
}

private void loadScan() {
// Use snapshot passed by the user.
// By default, use the latest snapshot.
m_scan = iceberg_table.newScan();
if (m_snapshotId != null) {
m_scan = m_scan.useSnapshot(m_snapshotId);
}
}

public void setTableIdentifier(String namespace, String tableName) {
m_tableIdentifier = TableIdentifier.of(namespace, tableName);
}
Expand All @@ -150,13 +159,7 @@ public Table loadTable(TableIdentifier identifier) {

public void loadTable() {
iceberg_table = loadTable(m_tableIdentifier);

// Use snapshot passed by the user.
// By default, use the latest snapshot.
m_scan = iceberg_table.newScan();
if (m_snapshotId != null) {
m_scan = m_scan.useSnapshot(m_snapshotId);
}
loadScan();
}

public boolean createTable(Schema schema, PartitionSpec spec, boolean overwrite) {
Expand Down Expand Up @@ -279,6 +282,9 @@ public boolean alterTable(String newSchema) throws Exception {

// all good - commit changes
updateSchema.commit();
// Reload table scan
loadScan();

return true;
}

Expand All @@ -287,7 +293,11 @@ public boolean dropTable() {
loadTable();

System.out.println("Dropping the table " + m_tableIdentifier);
if (m_catalog.dropTable(m_tableIdentifier)) {
boolean status = m_catalog.dropTable(m_tableIdentifier);
// Reload table scan
loadScan();

if (status) {
System.out.println("Table dropped successfully");
return true;
}
Expand Down Expand Up @@ -334,9 +344,12 @@ public Map<Integer, List<Map<String, String>>> getPlanFiles() {
List<Map<String, String>> taskMapList = new ArrayList<Map<String, String>>();
Map<String, String> taskMap = new HashMap<String, String>();
DataFile file = scanTask.file();
Long recordCount = file.recordCount();
this.fileRecordCount += recordCount;
taskMap.put("content", file.content().toString());
taskMap.put("file_path", file.path().toString());
taskMap.put("file_format", file.format().toString());
taskMap.put("record_count", Long.toString(recordCount));
taskMap.put("start", Long.toString(scanTask.start()));
taskMap.put("length", Long.toString(scanTask.length()));
taskMap.put("spec", scanTask.spec().toString());
Expand Down Expand Up @@ -365,9 +378,12 @@ public Map<Integer, List<Map<String, String>>> getPlanTasks() {
for (FileScanTask fileTask : scanTask.files()) {
Map<String, String> taskMap = new HashMap<String, String>();
DataFile file = fileTask.file();
Long recordCount = fileTask.estimatedRowsCount();
this.taskRecordCount += recordCount;
taskMap.put("content", file.content().toString());
taskMap.put("file_path", file.path().toString());
taskMap.put("file_format", file.format().toString());
taskMap.put("record_count", Long.toString(recordCount));
taskMap.put("start", Long.toString(fileTask.start()));
taskMap.put("length", Long.toString(fileTask.length()));
taskMap.put("spec", fileTask.spec().toString());
Expand Down Expand Up @@ -455,6 +471,30 @@ public String getUUID() {
TableMetadata metadata = ((HasTableOperations) iceberg_table).operations().current();
return metadata.uuid();
}

public Long getFileRecordCount() {
if (iceberg_table == null)
loadTable();

Iterable<FileScanTask> scanTasks = m_scan.planFiles();
for (FileScanTask scanTask : scanTasks) {
fileRecordCount += scanTask.file().recordCount();
}

return fileRecordCount;
}

public Long getTaskRecordCount() {
if (iceberg_table == null)
loadTable();

Iterable<CombinedScanTask> scanTasks = m_scan.planTasks();
for (CombinedScanTask scanTask : scanTasks) {
taskRecordCount += scanTask.files().stream().mapToLong(f -> f.estimatedRowsCount()).sum();
}

return taskRecordCount;
}

public Snapshot getCurrentSnapshot() {
if (iceberg_table == null)
Expand Down Expand Up @@ -734,6 +774,9 @@ public boolean commitTable(String dataFiles) throws Exception {
io.close();
System.out.println("Txn Complete!");

// Reload table scan after a commit
loadScan();

return true;
}

Expand Down Expand Up @@ -765,6 +808,9 @@ public boolean rewriteFiles(String dataFiles) throws Exception {
transaction.commitTransaction();
io.close();
System.out.println("Txn Complete!");

// Reload table scan after a commit
loadScan();

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
public abstract class MetastoreConnector
{
protected Long m_snapshotId = null;
protected Long fileRecordCount = 0L;
protected Long taskRecordCount = 0L;

public MetastoreConnector(CustomCatalog catalog, String namespace, String tableName, Credentials creds) {
}
Expand Down Expand Up @@ -97,6 +99,14 @@ public void setSnapshotId(Long snapshotId) {
this.m_snapshotId = snapshotId;
}

public Long getFileRecordCount() {
return fileRecordCount;
}

public Long getTaskRecordCount() {
return taskRecordCount;
}

@SuppressWarnings("serial")
class TableNotFoundException extends RuntimeException {
public TableNotFoundException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void parseOptions(Map<String, Command> commands, String[] args) throws Pa
options.addOption(Option.builder("w").longOpt("warehouse").argName("value").hasArg().desc("Table location").build());
options.addOption(Option.builder("o").longOpt("output").argName("console|csv|json").hasArg().desc("Show output in this format").build());
options.addOption(Option.builder().longOpt("catalog").argName("value").hasArg().desc("Read properties for this catalog from the config file").build());
options.addOption(Option.builder().longOpt("format").argName("iceberg|hive").hasArg().desc("The format of the table we want to display").build());
options.addOption(Option.builder("m").longOpt("format").argName("iceberg|hive").hasArg().desc("The format of the table we want to display").build());
options.addOption(Option.builder().longOpt("snapshot").argName("snapshot ID").hasArg().desc("Snapshot ID to use").build());

CommandLineParser parser = new DefaultParser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ private void initializeCommands() {
metadata.addArgument("identifier", "Table identifier", true);
m_commands.put("metadata", metadata);

Command recordcount = new Command("recordcount", "Get total number of records in a table");
recordcount.addOption("--help", "Show this help message and exit");
recordcount.addArgument("identifier", "Table identifier", true);
m_commands.put("recordcount", recordcount);

Command read = new Command("read", "Read from a table");
read.addOption("--help", "Show this help message and exit");
read.addArgument("identifier", "Table identifier", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,30 @@ public String printTableMetadata() throws Exception {
return output.tableMetadata(snapshot, targetSchema, tableLocation, dataLocation, type);
}

/**
* Get total record count from MetastoreConnector and return to the user is the given format
* @throws Exception
*/
public String printRecordCount() throws Exception {
Long totalEstimatedRecords = metaConn.getFileRecordCount();

return output.tableRecordCount(totalEstimatedRecords);
}

/**
* Get table details from MetastoreConnector and output to the user is the given format
* @throws Exception
*/
public String printTableDetails() throws Exception {
Map<Integer, List<Map<String, String>>> planFileTasks = metaConn.getPlanFiles();
Long totalEstimatedRecords = metaConn.getFileRecordCount();
Snapshot snapshot = metaConn.getCurrentSnapshot();
Schema targetSchema = metaConn.getTableSchema();
String tableLocation = metaConn.getTableLocation();
String dataLocation = metaConn.getTableDataLocation();
String type = metaConn.getTableType();

return output.tableDetails(planFileTasks, snapshot, targetSchema, tableLocation, dataLocation, type);
return output.tableDetails(planFileTasks, snapshot, targetSchema, tableLocation, dataLocation, type, totalEstimatedRecords);
}

/**
Expand Down Expand Up @@ -142,7 +153,9 @@ public String printUUID() throws Exception {
public String printFiles() throws Exception {
String outputString = null;

String planFiles = output.tableFiles(metaConn.getPlanFiles());
Map<Integer, List<Map<String, String>>> taskMapList = metaConn.getPlanFiles();
Long totalEstimatedRecords = metaConn.getFileRecordCount();
String planFiles = output.tableFiles(taskMapList, totalEstimatedRecords);
Long snapshotId = metaConn.getCurrentSnapshotId();
switch (format) {
case "json":
Expand All @@ -165,7 +178,9 @@ public String printFiles() throws Exception {
public String printTasks() throws Exception {
String outputString = null;

String planFiles = output.tableFiles(metaConn.getPlanTasks());
Map<Integer, List<Map<String, String>>> taskMapList = metaConn.getPlanTasks();
Long totalEstimatedRecords = metaConn.getTaskRecordCount();
String planFiles = output.tableFiles(taskMapList, totalEstimatedRecords);
Long snapshotId = metaConn.getCurrentSnapshotId();
switch (format) {
case "json":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import iceberg_cli.utils.DataConversion;

public class CsvOutput extends Output {

public CsvOutput() {
this.delimiter = ',';
}

@Override
public String listTables(List<String> tables) throws Exception {
Expand Down Expand Up @@ -47,34 +51,6 @@ public String namespaceDetails(Map<String,String> details) throws Exception {
return builder.toString();
}

@Override
public String tableFiles(Map<Integer, List<Map<String, String>>> planFileTasks) throws Exception {
StringBuilder builder = new StringBuilder();

// Add data files
char delim = ',';
if (planFileTasks != null) {
builder.append(String.format("TOTAL TASKS : %d\n", planFileTasks.size()));
for (Map.Entry<Integer, List<Map<String, String>>> entry : planFileTasks.entrySet()) {
builder.append(String.format("TOTAL FILES IN TASK %d : %d\n", entry.getKey(),entry.getValue().size()));
for (Map<String, String> task : entry.getValue()) {
String taskInfo = String.format("%s%c%s%c%s%c%s%c%s%c%s%c%s",
task.get("content"), delim,
task.get("file_path"), delim,
task.get("file_format"), delim,
task.get("start"), delim,
task.get("length"), delim,
task.get("spec"), delim,
task.get("residual")
);
builder.append(String.format("%s\n", taskInfo));
}
}
}

return builder.toString();
}

@Override
public String allSnapshots(java.lang.Iterable<Snapshot> snapshots) throws Exception {
return DataConversion.snapshotsAsCsv(snapshots);
Expand Down
Loading