easyfabric.load_data_bronze
logging
datetime
DataFrame
config
MSG_CONFIG_NOT_INITIALIZED
MSG_CORRELATION_NO_ROWS
MSG_CORRELATION_OK
MSG_COUNT_ERROR
MSG_FILE_CHANGED
MSG_FILE_UNCHANGED
MSG_FILETYPE_NONE
MSG_FRESHNESS_OK
MSG_HISTORY_WRITE_ERROR
MSG_NO_PREVIOUS_HISTORY
MSG_NO_TRACKER_ENTRY
MSG_STALE_FILE
MSG_UNSUPPORTED_FILETYPE
ConfigManager
LoadConfig
ObjectInfo
TableConfig
initialize_config
FabricChecksumMismatchError
FabricConfigError
FabricIntegrityError
FabricLoadError
FabricStaleFileError
get_log_file_path
init_logging
log_segment
save_log_file_to_table
get_spark
check_source_unchanged
dataframe_to_bronze
dataframe_to_bronze_old
load_csvwithschema_bronze
load_dataframe_bronze_history
load_json_bronze
load_notebook_bronze
load_notebook_midbronze
load_notebook_postbronze
load_notebook_prebronze
load_parquet_bronze
load_xlsx_bronze
load_xml_bronze
pull_files
refresh_table
truncate_bronze_table
get_tracker_file_path
has_files_changed
load_previous_snapshot
save_snapshot
ViolationRegistry
layer
run
def run(tablefile: str, config_manager: ConfigManager = None) -> str | None
Runs the bronze loader process for a specified table configuration and pulls files from the source, processes them, and loads them into the bronze layer.
This function initializes the table configuration based on the specified table file and checks its active state for the bronze process. If the table's configuration specifies pre-processing or post-processing notebooks, they are executed accordingly. Files from the source are pulled, processed, and loaded into the bronze layer based on their specified file types. Supported file types include CSV, JSON, XML, Parquet, and Notebook. The function also handles exceptions and ensures logs are saved correctly.
Arguments:
tablefilestr - Path to the YAML file representing a table's configuration.config_managerConfigManager - An instance of ConfigManager used for accessing the application's configuration settings.
Returns:
str- A message indicating the outcome of the loading process, such as the number of files loaded or an error message in case of failure.
Raises:
Exception- If theconfig_manageris not initialized, no active configuration can be found for the table, or the filetype is unsupported. @sidebar_position 3
dataframeloader
def dataframeloader(data_frame: DataFrame,
load_config: LoadConfig,
table_config: TableConfig,
config_manager: ConfigManager = None) -> str | None
Loads a DataFrame into a specified data platform table using the provided configuration and manager.
This function handles the loading operation by using detailed configurations for the DataFrame, table, and the application configuration manager. It sets up logging, ensures required parameters are initialized, and supports specific settings for different layers (e.g., bronze layer). The function handles exception logging and provides mechanisms to stop processing upon encountering errors based on configuration settings.
Arguments:
data_frameDataFrame - The data to be loaded into the specified table.load_configLoadConfig - Contains configuration for the loading process, including destination table.table_configTableConfig - Holds table-specific settings, e.g., table name identifiers and layers.config_managerConfigManager - Manages and validates application-level configurations.
Returns:
str- Message indicating the result of the DataFrame loading process, including the target table name and error details if applicable.
Raises:
Exception- If the destination table name is missing from LoadConfig.Exception- If the ConfigManager is not properly initialized.
cleanup_bronze_files
def cleanup_bronze_files(config_manager: ConfigManager = None) -> dict
Cleans up old files and partitions in the Bronze Lakehouse depending on the configuration settings on the Table, Connection, or global Lakehouse level.
Arguments:
config_managerConfigManager - An instance of ConfigManager.
Returns:
dict- A dictionary containing metrics of the cleanup: deleted_files, bytes_deleted, and skipped_objects.