PPPDC_example / app.py
JUNGU's picture
Update app.py
8fd3221 verified
raw
history blame contribute delete
No virus
19.5 kB
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from io import StringIO
import openpyxl
import matplotlib.font_manager as fm
from scipy import stats
import os
import plotly.figure_factory as ff
#μ‚¬μ΄μ¦ˆ 크게
st.set_page_config(layout="wide")
# ν•œκΈ€ 폰트 μ„€μ •
def set_font():
font_path = "Pretendard-Bold.ttf" # μ‹€μ œ 폰트 파일 경둜둜 λ³€κ²½ν•΄μ£Όμ„Έμš”
fm.fontManager.addfont(font_path)
return {'font.family': 'Pretendard-Bold', 'axes.unicode_minus': False}
# 폰트 섀정을 κ°€μ Έμ˜΅λ‹ˆλ‹€
font_settings = set_font()
# μ„Έμ…˜ μƒνƒœ μ΄ˆκΈ°ν™” 및 관리
def manage_session_state():
if 'data' not in st.session_state:
st.session_state.data = None
if 'processed_data' not in st.session_state:
st.session_state.processed_data = None
if 'numeric_columns' not in st.session_state:
st.session_state.numeric_columns = []
if 'categorical_columns' not in st.session_state:
st.session_state.categorical_columns = []
if 'x_var' not in st.session_state:
st.session_state.x_var = None
if 'y_var' not in st.session_state:
st.session_state.y_var = None
if 'slicers' not in st.session_state:
st.session_state.slicers = {}
if 'analysis_performed' not in st.session_state:
st.session_state.analysis_performed = False
if 'filtered_data' not in st.session_state:
st.session_state.filtered_data = None
def reset_session_state():
# μ„Έμ…˜ μƒνƒœ μ΄ˆκΈ°ν™”
st.session_state.data = None
st.session_state.processed_data = None
st.session_state.filtered_data = None
st.session_state.numeric_columns = []
st.session_state.categorical_columns = []
st.session_state.x_var = None
st.session_state.y_var = None
st.session_state.slicers = {}
st.session_state.analysis_performed = False
SAMPLE_DATA_FILES = [
{"name": "κ³Όλͺ©λ³„ λ…Έλ ₯κ³Ό 성취도", "file": "subject.xlsx"},
{"name": "채점", "file": "score.xlsx"},
{"name": "μΆœμ„μΌμˆ˜μ™€ 성적", "file": "attendance.xlsx"}
]
def load_sample_data(file_name):
# μ˜ˆμ‹œ 데이터 파일 경둜
file_path = os.path.join("sample_data", file_name)
if file_name.endswith('.csv'):
return pd.read_csv(file_path)
elif file_name.endswith(('.xls', '.xlsx')):
return pd.read_excel(file_path)
else:
st.error("μ§€μ›λ˜μ§€ μ•ŠλŠ” 파일 ν˜•μ‹μž…λ‹ˆλ‹€.")
return None
# 데이터 λ‘œλ“œ
@st.cache_data
def load_data(file):
file_extension = file.name.split('.')[-1].lower()
if file_extension == 'csv':
data = pd.read_csv(file)
elif file_extension in ['xls', 'xlsx']:
data = pd.read_excel(file)
else:
st.error("μ§€μ›λ˜μ§€ μ•ŠλŠ” 파일 ν˜•μ‹μž…λ‹ˆλ‹€. CSV, XLS, λ˜λŠ” XLSX νŒŒμΌμ„ μ—…λ‘œλ“œν•΄μ£Όμ„Έμš”.")
return None
# 빈 μ—΄ 이름에 κΈ°λ³Έκ°’ λΆ€μ—¬
if data.columns.isnull().any():
data.columns = [f'Column_{i+1}' if pd.isnull(col) else col for i, col in enumerate(data.columns)]
return data
def manual_data_entry():
col_names = st.text_input("μ—΄ 이름을 μ‰Όν‘œλ‘œ κ΅¬λΆ„ν•˜μ—¬ μž…λ ₯ν•˜μ„Έμš”:", key="manual_col_names").split(',')
col_names = [name.strip() for name in col_names if name.strip()]
if col_names:
num_rows = st.number_input("초기 ν–‰μ˜ 수λ₯Ό μž…λ ₯ν•˜μ„Έμš”:", min_value=1, value=5, key="manual_num_rows")
data = pd.DataFrame(columns=col_names, index=range(num_rows))
edited_data = st.data_editor(data, num_rows="dynamic", key="manual_data_editor")
return edited_data
return None
def preprocess_data(data):
# 데이터 νƒ€μž… μΆ”λ‘  및 λ³€ν™˜
for column in data.columns:
if data[column].dtype == 'object':
try:
# NaN 값을 λ¬΄μ‹œν•˜κ³  숫자둜 λ³€ν™˜ μ‹œλ„
numeric_converted = pd.to_numeric(data[column], errors='coerce')
# λͺ¨λ“  값이 NaN이 μ•„λ‹ˆλΌλ©΄ λ³€ν™˜λœ 열을 μ‚¬μš©
if not numeric_converted.isna().all():
data[column] = numeric_converted
st.write(f"'{column}' 열을 μˆ«μžν˜•μœΌλ‘œ λ³€ν™˜ν–ˆμŠ΅λ‹ˆλ‹€.")
except:
st.write(f"'{column}' 열은 λ²”μ£Όν˜•μœΌλ‘œ μœ μ§€λ©λ‹ˆλ‹€.")
# 결츑치 처리 (κΈ°μ‘΄ μ½”λ“œ μœ μ§€)
if data.isnull().sum().sum() > 0:
st.write("결츑치 처리:")
for column in data.columns:
if data[column].isnull().sum() > 0:
method = st.selectbox(f"{column} μ—΄μ˜ 처리 방법 선택:",
["제거", "ν‰κ· μœΌλ‘œ λŒ€μ²΄", "μ€‘μ•™κ°’μœΌλ‘œ λŒ€μ²΄", "μ΅œλΉˆκ°’μœΌλ‘œ λŒ€μ²΄"],
key=f"missing_{column}")
if method == "제거":
data = data.dropna(subset=[column])
elif method == "ν‰κ· μœΌλ‘œ λŒ€μ²΄":
if pd.api.types.is_numeric_dtype(data[column]):
data[column].fillna(data[column].mean(), inplace=True)
else:
st.warning(f"{column} 열은 μˆ«μžν˜•μ΄ μ•„λ‹ˆμ–΄μ„œ ν‰κ· κ°’μœΌλ‘œ λŒ€μ²΄ν•  수 μ—†μŠ΅λ‹ˆλ‹€.")
elif method == "μ€‘μ•™κ°’μœΌλ‘œ λŒ€μ²΄":
if pd.api.types.is_numeric_dtype(data[column]):
data[column].fillna(data[column].median(), inplace=True)
else:
st.warning(f"{column} 열은 μˆ«μžν˜•μ΄ μ•„λ‹ˆμ–΄μ„œ μ€‘μ•™κ°’μœΌλ‘œ λŒ€μ²΄ν•  수 μ—†μŠ΅λ‹ˆλ‹€.")
elif method == "μ΅œλΉˆκ°’μœΌλ‘œ λŒ€μ²΄":
data[column].fillna(data[column].mode()[0], inplace=True)
# μˆ«μžν˜• μ—΄κ³Ό λ²”μ£Όν˜• μ—΄ 뢄리
st.session_state.numeric_columns = data.select_dtypes(include=['float64', 'int64']).columns.tolist()
st.session_state.categorical_columns = data.select_dtypes(exclude=['float64', 'int64']).columns.tolist()
return data
def update_filtered_data():
st.session_state.filtered_data = apply_slicers(st.session_state.processed_data)
def create_slicers(data):
for col in st.session_state.categorical_columns:
if col in data.columns and data[col].nunique() <= 10:
st.session_state.slicers[col] = st.multiselect(
f"{col} 선택",
options=sorted(data[col].unique()),
default=sorted(data[col].unique()),
key=f"slicer_{col}",
on_change=update_filtered_data
)
def apply_slicers(data):
filtered_data = data.copy()
for col, selected_values in st.session_state.slicers.items():
if col in filtered_data.columns and selected_values:
filtered_data = filtered_data[filtered_data[col].isin(selected_values)]
return filtered_data
def plot_correlation_heatmap(data):
numeric_data = data[st.session_state.numeric_columns]
if not numeric_data.empty:
corr = numeric_data.corr()
fig = px.imshow(corr, color_continuous_scale='RdBu_r', zmin=-1, zmax=1)
fig.update_layout(title='상관관계 히트맡')
st.plotly_chart(fig)
else:
st.warning("상관관계 νžˆνŠΈλ§΅μ„ 그릴 수 μžˆλŠ” μˆ«μžν˜• 열이 μ—†μŠ΅λ‹ˆλ‹€.")
def check_normality(data, column):
# μ‹œκ°μ  검사: Q-Q plot
fig = go.Figure()
qq = stats.probplot(data[column], dist="norm")
fig.add_trace(go.Scatter(x=qq[0][0], y=qq[0][1], mode='markers', name='Sample Quantiles'))
fig.add_trace(go.Scatter(x=qq[0][0], y=qq[1][0] * qq[0][0] + qq[1][1], mode='lines', name='Theoretical Quantiles'))
fig.update_layout(title=f'Q-Q Plot for {column}', xaxis_title='Theoretical Quantiles', yaxis_title='Sample Quantiles')
st.plotly_chart(fig)
# 톡계적 검사: Shapiro-Wilk test
stat, p = stats.shapiro(data[column])
st.write(f"Shapiro-Wilk Test for {column}:")
st.write(f"ν†΅κ³„λŸ‰: {stat:.4f}")
st.write(f"p-value: {p:.4f}")
if p > 0.05:
st.write("데이터가 μ •κ·œ 뢄포λ₯Ό λ”°λ₯΄λŠ” κ²ƒμœΌλ‘œ λ³΄μž…λ‹ˆλ‹€ (귀무가섀을 κΈ°κ°ν•˜μ§€ λͺ»ν•¨)")
else:
st.write("데이터가 μ •κ·œ 뢄포λ₯Ό λ”°λ₯΄μ§€ μ•ŠλŠ” κ²ƒμœΌλ‘œ λ³΄μž…λ‹ˆλ‹€ (귀무가섀 기각)")
def perform_independent_ttest(data, group_column, value_column):
groups = data[group_column].unique()
if len(groups) != 2:
st.error("독립 ν‘œλ³Έ t-검정은 μ •ν™•νžˆ 두 그룹이 ν•„μš”ν•©λ‹ˆλ‹€.")
return
group1 = data[data[group_column] == groups[0]][value_column]
group2 = data[data[group_column] == groups[1]][value_column]
t_stat, p_value = stats.ttest_ind(group1, group2)
st.write(f"독립 ν‘œλ³Έ T-κ²€μ • κ²°κ³Ό ({group_column} κΈ°μ€€, {value_column} 비ꡐ):")
st.write(f"κ·Έλ£Ή: {groups[0]} vs {groups[1]}")
st.write(f"t-ν†΅κ³„λŸ‰: {t_stat:.4f}")
st.write(f"p-value: {p_value:.4f}")
if p_value < 0.05:
st.write("두 κ·Έλ£Ή 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μžˆμŠ΅λ‹ˆλ‹€.")
else:
st.write("두 κ·Έλ£Ή 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μ—†μŠ΅λ‹ˆλ‹€.")
def perform_paired_ttest(data, column1, column2):
if len(data[column1]) != len(data[column2]):
st.error("λŒ€μ‘ ν‘œλ³Έ t-검정을 μœ„ν•΄μ„œλŠ” 두 μ—΄μ˜ 데이터 μˆ˜κ°€ κ°™μ•„μ•Ό ν•©λ‹ˆλ‹€.")
return
t_stat, p_value = stats.ttest_rel(data[column1], data[column2])
st.write(f"λŒ€μ‘ ν‘œλ³Έ T-κ²€μ • κ²°κ³Ό ({column1} vs {column2}):")
st.write(f"t-ν†΅κ³„λŸ‰: {t_stat:.4f}")
st.write(f"p-value: {p_value:.4f}")
if p_value < 0.05:
st.write(f"{column1}κ³Ό {column2} 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μžˆμŠ΅λ‹ˆλ‹€.")
else:
st.write(f"{column1}κ³Ό {column2} 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μ—†μŠ΅λ‹ˆλ‹€.")
def perform_onesample_ttest(data, column, test_value):
t_stat, p_value = stats.ttest_1samp(data[column], test_value)
st.write(f"단일 ν‘œλ³Έ T-κ²€μ • κ²°κ³Ό:")
st.write(f"t-ν†΅κ³„λŸ‰: {t_stat:.4f}")
st.write(f"p-value: {p_value:.4f}")
if p_value < 0.05:
st.write(f"ν‘œλ³Έ 평균이 {test_value}와 μœ μ˜ν•˜κ²Œ λ‹€λ¦…λ‹ˆλ‹€.")
else:
st.write(f"ν‘œλ³Έ 평균이 {test_value}와 μœ μ˜ν•˜κ²Œ λ‹€λ₯΄μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.")
def plot_scatter_with_regression(data, x_var, y_var):
# νšŒκ·€ 뢄석 μˆ˜ν–‰
x = data[x_var]
y = data[y_var]
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
# μ˜ˆμΈ‘κ°’ 계산
y_pred = slope * x + intercept
# μž”μ°¨ 계산
residuals = y - y_pred
# κ·Έλž˜ν”„ 생성
fig = go.Figure()
# 산점도 μΆ”κ°€ (였차 λ§‰λŒ€ 포함)
fig.add_trace(go.Scatter(
x=x,
y=y,
mode='markers',
name='Data Points',
marker=dict(color='rgba(0, 0, 255, 0.7)', size=10),
error_y=dict(
type='data',
array=abs(residuals),
visible=True,
color='rgba(0, 0, 0, 0.1)',
thickness=0.5,
width=0
)
))
# νšŒκ·€μ„  μΆ”κ°€
fig.add_trace(go.Scatter(
x=x,
y=y_pred,
mode='lines',
name='Regression Line',
line=dict(color='red', width=2)
))
# λ ˆμ΄μ•„μ›ƒ μ„€μ •
r_squared = r_value ** 2
fig.update_layout(
title=f'{x_var}와 {y_var}의 관계 (R-squared: {r_squared:.3f})',
xaxis_title=x_var,
yaxis_title=y_var,
showlegend=True,
annotations=[
dict(
x=0.05,
y=0.95,
xref='paper',
yref='paper',
text=f'y = {slope:.2f}x + {intercept:.2f}<br>RΒ² = {r_squared:.3f}',
showarrow=False,
bgcolor='rgba(255, 255, 255, 0.8)',
bordercolor='rgba(0, 0, 0, 0.3)',
borderwidth=1
)
]
)
st.plotly_chart(fig)
# μΆ”κ°€ 톡계 정보
st.write(f"μƒκ΄€κ³„μˆ˜: {r_value:.4f}")
st.write(f"p-value: {p_value:.4f}")
st.write(f"ν‘œμ€€ 였차: {std_err:.4f}")
def get_active_slicers():
return {col: values for col, values in st.session_state.slicers.items() if values}
def perform_independent_ttest(data, group_column, group1, group2, value_column):
group1_data = data[data[group_column] == group1][value_column]
group2_data = data[data[group_column] == group2][value_column]
t_stat, p_value = stats.ttest_ind(group1_data, group2_data)
st.write(f"독립 ν‘œλ³Έ T-κ²€μ • κ²°κ³Ό ({group_column}: {group1} vs {group2}, {value_column} 비ꡐ):")
st.write(f"t-ν†΅κ³„λŸ‰: {t_stat:.4f}")
st.write(f"p-value: {p_value:.4f}")
if p_value < 0.05:
st.write(f"{group1}κ³Ό {group2} 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μžˆμŠ΅λ‹ˆλ‹€.")
else:
st.write(f"{group1}κ³Ό {group2} 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μ—†μŠ΅λ‹ˆλ‹€.")
def perform_analysis():
if st.session_state.filtered_data is None:
st.session_state.filtered_data = st.session_state.processed_data.copy()
st.header("탐색적 데이터 뢄석")
# μŠ¬λΌμ΄μ„œ 생성
create_slicers(st.session_state.processed_data)
# 데이터가 변경될 λ•Œλ§ˆλ‹€ ν•„ν„°λ§λœ 데이터 μ—…λ°μ΄νŠΈ
st.session_state.filtered_data = apply_slicers(st.session_state.processed_data)
# 3μ—΄ λ ˆμ΄μ•„μ›ƒ 생성
col1, col2, col3 = st.columns(3)
with col1:
# μš”μ•½ 톡계
st.write("μš”μ•½ 톡계:")
st.write(st.session_state.filtered_data.describe())
# 상관관계 히트맡
st.subheader("상관관계 히트맡")
plot_correlation_heatmap(st.session_state.filtered_data)
with col2:
# μ‚¬μš©μžκ°€ μ„ νƒν•œ 두 λ³€μˆ˜μ— λŒ€ν•œ 산점도 및 νšŒκ·€ 뢄석
st.subheader("두 λ³€μˆ˜ κ°„μ˜ 관계 뢄석")
x_var = st.selectbox("XμΆ• λ³€μˆ˜ 선택", options=st.session_state.numeric_columns, key='x_var')
y_var = st.selectbox("YμΆ• λ³€μˆ˜ 선택", options=[col for col in st.session_state.numeric_columns if col != x_var], key='y_var')
if x_var and y_var:
plot_scatter_with_regression(st.session_state.filtered_data, x_var, y_var)
with col3:
st.subheader("톡계적 κ²€μ •")
# μ •κ·œμ„± κ²€μ •
st.write("μ •κ·œμ„± κ²€μ •")
normality_column = st.selectbox("μ •κ·œμ„± 검정을 μˆ˜ν–‰ν•  μ—΄ 선택:", st.session_state.numeric_columns, key='normality_column')
if st.button("μ •κ·œμ„± κ²€μ • μˆ˜ν–‰"):
check_normality(st.session_state.filtered_data, normality_column)
# T-κ²€μ •
st.write("T-κ²€μ •")
test_type = st.radio("T-κ²€μ • μœ ν˜• 선택:", ["독립 ν‘œλ³Έ", "λŒ€μ‘ ν‘œλ³Έ", "단일 ν‘œλ³Έ"], key="test_type_radio")
if test_type == "독립 ν‘œλ³Έ":
active_slicers = get_active_slicers()
if active_slicers:
group_column = st.selectbox("κ·Έλ£Ή ꡬ뢄을 μœ„ν•œ μ—΄ 선택:", options=list(active_slicers.keys()))
available_groups = active_slicers[group_column]
group1 = st.selectbox("첫 번째 κ·Έλ£Ή 선택:", options=available_groups, key="group1")
group2 = st.selectbox("두 번째 κ·Έλ£Ή 선택:",
options=[g for g in available_groups if g != group1],
key="group2")
value_column = st.selectbox("비ꡐ할 값이 μžˆλŠ” μ—΄ 선택:", st.session_state.numeric_columns)
if st.button("독립 ν‘œλ³Έ T-κ²€μ • μˆ˜ν–‰"):
if group1 and group2:
perform_independent_ttest(st.session_state.filtered_data, group_column, group1, group2, value_column)
else:
st.error("두 개의 μ„œλ‘œ λ‹€λ₯Έ 그룹을 μ„ νƒν•΄μ£Όμ„Έμš”.")
else:
st.warning("ν™œμ„±ν™”λœ μŠ¬λΌμ΄μ„œκ°€ μ—†μŠ΅λ‹ˆλ‹€. λ¨Όμ € μŠ¬λΌμ΄μ„œμ—μ„œ 그룹을 μ„ νƒν•΄μ£Όμ„Έμš”.")
elif test_type == "λŒ€μ‘ ν‘œλ³Έ":
column1 = st.selectbox("첫 번째 μ—΄ 선택:", st.session_state.numeric_columns, key="paired_col1")
column2 = st.selectbox("두 번째 μ—΄ 선택:",
[col for col in st.session_state.numeric_columns if col != column1],
key="paired_col2")
if st.button("λŒ€μ‘ ν‘œλ³Έ T-κ²€μ • μˆ˜ν–‰"):
perform_paired_ttest(st.session_state.filtered_data, column1, column2)
elif test_type == "단일 ν‘œλ³Έ":
test_column = st.selectbox("κ²€μ •ν•  μ—΄ 선택:", st.session_state.numeric_columns, key="one_sample_col")
test_value = st.number_input("κ²€μ • κ°’ μž…λ ₯:", key="one_sample_value")
if st.button("단일 ν‘œλ³Έ T-κ²€μ • μˆ˜ν–‰"):
perform_onesample_ttest(st.session_state.filtered_data, test_column, test_value)
# 'λ‹€λ₯Έ 데이터 λΆ„μ„ν•˜κΈ°' λ²„νŠΌ μΆ”κ°€
if st.button("λ‹€λ₯Έ 데이터 λΆ„μ„ν•˜κΈ°(였λ₯˜κ°€ λ‚˜λ©΄ λ‹€μ‹œ λˆŒλŸ¬μ£Όμ„Έμš”)"):
reset_session_state()
st.experimental_rerun()
## 메인
def main():
st.title("λͺ¨λ‘κ°€ ν•  수 μžˆλŠ” 데이터 뢄석 νˆ΄ν‚· Data Analysis for Everyone")
st.link_button("λ§Œλ“ μ΄ μ½”λ‚œμŒ€", "https://www.youtube.com/@conanssam")
manage_session_state()
if st.session_state.data is None:
data_input_method = st.radio("데이터 μž…λ ₯ 방법 선택:", ("파일 μ—…λ‘œλ“œ", "μ˜ˆμ‹œ 데이터 μ‚¬μš©", "μˆ˜λ™ μž…λ ₯"), key="data_input_method")
if data_input_method == "파일 μ—…λ‘œλ“œ":
uploaded_file = st.file_uploader("CSV, XLS, λ˜λŠ” XLSX νŒŒμΌμ„ μ„ νƒν•˜μ„Έμš”", type=["csv", "xls", "xlsx"], key="file_uploader")
if uploaded_file is not None:
st.session_state.data = load_data(uploaded_file)
elif data_input_method == "μ˜ˆμ‹œ 데이터 μ‚¬μš©":
sample_choice = st.selectbox(
"μ˜ˆμ‹œ 데이터 선택",
options=[sample["name"] for sample in SAMPLE_DATA_FILES],
format_func=lambda x: x
)
if st.button("μ„ νƒν•œ μ˜ˆμ‹œ 데이터 λ‘œλ“œ"):
selected_file = next(sample["file"] for sample in SAMPLE_DATA_FILES if sample["name"] == sample_choice)
st.session_state.data = load_sample_data(selected_file)
else:
st.session_state.data = manual_data_entry()
if st.session_state.data is not None:
st.subheader("데이터 미리보기 및 μˆ˜μ •")
st.write("데이터λ₯Ό ν™•μΈν•˜κ³  ν•„μš”ν•œ 경우 μˆ˜μ •ν•˜μ„Έμš”:")
edited_data = st.data_editor(
st.session_state.data,
num_rows="dynamic",
key="main_data_editor"
)
if st.button("데이터 뢄석 μ‹œμž‘", key="start_analysis") or st.session_state.analysis_performed:
st.session_state.processed_data = preprocess_data(edited_data)
st.session_state.analysis_performed = True
if st.session_state.analysis_performed:
perform_analysis()
if __name__ == "__main__":
main()