Validation workflow completed.
🚨 ALERT: Critical validation failures found!
Failed steps: 2
--- Validation Summary Report ---
Total validation steps: 3
Passed steps: 1
Failed steps: 2
Highest severity: critical
--- End of Report ---
Final Actions
Execute actions after validation completes, such as sending alerts or generating summary reports.
Validation with Final Actions
Execute actions after validation completes, such as sending alerts or generating summary reports.
import pointblank as pb
def send_alert():
"""Check validation summary and send alert if critical failures found"""
summary = pb.get_validation_summary()
if summary and summary.get("highest_severity") == "critical":
print(f"🚨 ALERT: Critical validation failures found!")
print(f" Failed steps: {summary['n_failing_steps']}")
elif summary and summary.get("highest_severity") == "error":
print(f"⚠️ WARNING: Error-level validation failures detected.")
else:
print("✅ All validation checks passed successfully!")
def generate_summary_report():
"""Generate a summary report of validation results"""
summary = pb.get_validation_summary()
if summary:
print("\n--- Validation Summary Report ---")
print(f"Total validation steps: {summary['n_steps']}")
print(f"Passed steps: {summary['n_passing_steps']}")
print(f"Failed steps: {summary['n_failing_steps']}")
print(f"Highest severity: {summary['highest_severity']}")
print("--- End of Report ---")
validation = (
pb.Validate(
data=pb.load_dataset(dataset="game_revenue", tbl_type="polars"),
label="Validation with final actions",
thresholds=pb.Thresholds(warning=0.05, error=0.10, critical=0.15),
final_actions=pb.FinalActions(
"Validation workflow completed.", # String message
send_alert, # Alert function
generate_summary_report # Report function
)
)
.col_vals_regex(columns="player_id", pattern=r"[A-Z]{12}[0-9]{3}")
.col_vals_gt(columns="item_revenue", value=0.05)
.col_vals_gt(columns="session_duration", value=15)
.interrogate()
)
validationPreview of Input Table
PolarsRows2,000Columns11 |
|||||||||||