def test_integration_null_values(spark):
"""Test integration with null values."""
# Define schemas with explicit types
sap_mara_schema = StructType(
[
StructField("MANDT", StringType(), True),
StructField("MATNR", StringType(), True),
StructField("MEINS", StringType(), True),
StructField("GLOBAL_MATERIAL_NUMBER", StringType(), True),
]
)
sap_mbew_schema = StructType(
[
StructField("MANDT", StringType(), True),
StructField("MATNR", StringType(), True),
StructField("BWKEY", StringType(), True),
StructField("VPRSV", StringType(), True),
StructField("VERPR", DoubleType(), True),
StructField("STPRS", DoubleType(), True),
StructField("PEINH", LongType(), True),
StructField("BKLAS", StringType(), True),
]
)
sap_marc_schema = StructType(
[
StructField("SOURCE_SYSTEM_ERP", StringType(), True),
StructField("MATNR", StringType(), True),
StructField("WERKS", StringType(), True),
]
)
sap_t001k_schema = StructType(
[
StructField("MANDT", StringType(), True),
StructField("BWKEY", StringType(), True),
StructField("BUKRS", StringType(), True),
]
)
sap_t001w_schema = StructType(
[
StructField("MANDT", StringType(), True),
StructField("WERKS", StringType(), True),
StructField("BWKEY", StringType(), True),
StructField("NAME1", StringType(), True),
]
)
sap_t001_schema = StructType(
[
StructField("MANDT", StringType(), True),
StructField("BUKRS", StringType(), True),
StructField("WAERS", StringType(), True),
]
)
# Test data with nulls
sap_mara = spark.createDataFrame([("100", "MAT1", None, None)], sap_mara_schema)
sap_mbew = spark.createDataFrame([("100", "MAT1", "VAL1", None, None, None, None, None)], sap_mbew_schema)
sap_marc = spark.createDataFrame([("ERP1", "MAT1", "PLANT1")], sap_marc_schema)
sap_t001k = spark.createDataFrame([("100", "VAL1", None)], sap_t001k_schema)
sap_t001w = spark.createDataFrame([("100", "PLANT1", "VAL1", None)], sap_t001w_schema)
sap_t001 = spark.createDataFrame([("100", None, None)], sap_t001_schema)
# Execute integration
result_df = integration(sap_mara, sap_mbew, sap_marc, sap_t001k, sap_t001w, sap_t001)
# Verify results
assert result_df.count() > 0, "Result DataFrame should not be empty"
# Get first row
first_row = result_df.first()
assert first_row is not None, "First row should not be None"
# Verify null values in specific columns
assert first_row["MEINS"] is None, "MEINS column should be None"
assert first_row["GLOBAL_MATERIAL_NUMBER"] is None, "GLOBAL_MATERIAL_NUMBER column should be None"