Skip to main content

easyfabric.load_data_bronze

logging

datetime

DataFrame

config

MSG_CONFIG_NOT_INITIALIZED

MSG_CORRELATION_NO_ROWS

MSG_CORRELATION_OK

MSG_COUNT_ERROR

MSG_FILE_CHANGED

MSG_FILE_UNCHANGED

MSG_FILETYPE_NONE

MSG_FRESHNESS_OK

MSG_HISTORY_WRITE_ERROR

MSG_NO_PREVIOUS_HISTORY

MSG_NO_TRACKER_ENTRY

MSG_STALE_FILE

MSG_UNSUPPORTED_FILETYPE

ConfigManager

LoadConfig

ObjectInfo

TableConfig

initialize_config

FabricChecksumMismatchError

FabricConfigError

FabricIntegrityError

FabricLoadError

FabricStaleFileError

get_log_file_path

init_logging

log_segment

save_log_file_to_table

get_spark

check_source_unchanged

dataframe_to_bronze

dataframe_to_bronze_old

load_csvwithschema_bronze

load_dataframe_bronze_history

load_json_bronze

load_notebook_bronze

load_notebook_midbronze

load_notebook_postbronze

load_notebook_prebronze

load_parquet_bronze

load_xlsx_bronze

load_xml_bronze

pull_files

refresh_table

truncate_bronze_table

get_tracker_file_path

has_files_changed

load_previous_snapshot

save_snapshot

ViolationRegistry

layer

run

def run(tablefile: str, config_manager: ConfigManager = None) -> str | None

Runs the bronze loader process for a specified table configuration and pulls files from the source, processes them, and loads them into the bronze layer.

This function initializes the table configuration based on the specified table file and checks its active state for the bronze process. If the table's configuration specifies pre-processing or post-processing notebooks, they are executed accordingly. Files from the source are pulled, processed, and loaded into the bronze layer based on their specified file types. Supported file types include CSV, JSON, XML, Parquet, and Notebook. The function also handles exceptions and ensures logs are saved correctly.

Arguments:

  • tablefile str - Path to the YAML file representing a table's configuration.
  • config_manager ConfigManager - An instance of ConfigManager used for accessing the application's configuration settings.

Returns:

  • str - A message indicating the outcome of the loading process, such as the number of files loaded or an error message in case of failure.

Raises:

  • Exception - If the config_manager is not initialized, no active configuration can be found for the table, or the filetype is unsupported. @sidebar_position 3

dataframeloader

def dataframeloader(data_frame: DataFrame,
load_config: LoadConfig,
table_config: TableConfig,
config_manager: ConfigManager = None) -> str | None

Loads a DataFrame into a specified data platform table using the provided configuration and manager.

This function handles the loading operation by using detailed configurations for the DataFrame, table, and the application configuration manager. It sets up logging, ensures required parameters are initialized, and supports specific settings for different layers (e.g., bronze layer). The function handles exception logging and provides mechanisms to stop processing upon encountering errors based on configuration settings.

Arguments:

  • data_frame DataFrame - The data to be loaded into the specified table.
  • load_config LoadConfig - Contains configuration for the loading process, including destination table.
  • table_config TableConfig - Holds table-specific settings, e.g., table name identifiers and layers.
  • config_manager ConfigManager - Manages and validates application-level configurations.

Returns:

  • str - Message indicating the result of the DataFrame loading process, including the target table name and error details if applicable.

Raises:

  • Exception - If the destination table name is missing from LoadConfig.
  • Exception - If the ConfigManager is not properly initialized.

cleanup_bronze_files

def cleanup_bronze_files(config_manager: ConfigManager = None) -> dict

Cleans up old files and partitions in the Bronze Lakehouse depending on the configuration settings on the Table, Connection, or global Lakehouse level.

Arguments:

  • config_manager ConfigManager - An instance of ConfigManager.

Returns:

  • dict - A dictionary containing metrics of the cleanup: deleted_files, bytes_deleted, and skipped_objects.