Help me een Python-script implementeren voor gegevensomzetting en laden in een database
description
Door deze prompt te gebruiken, stroomlijn je het gegevenslaadproces, minimaliseer je fouten en zorg je dat je gegevens correct worden geformatteerd en geïntegreerd in je databasesysteem.
prompt
try_prompt
Ik wil een Python-script maken dat gegevens transformeert en laadt van een bronbestand in mijn database. Het script moet verschillende gegevensformaten afhandelen, de ... more
generate_helper
...
ai_answers
provider: openai
model: gpt-4.1-nano
Hier is een voorbeeld van een Python-script dat gegevens uit een CSV-bestand leest, verschillende formaten afhandelt, transformaties toepast en de gegevens in een MySQL-database laadt. Dit script gebruikt de bibliotheken pandas en mysql-connector-python.
```python
import pandas as pd
import mysql.connector
from mysql.connector import errorcode
# Configuratie van databaseverbinding
db_config = {
'user': 'jouw_gebruikersnaam',
'password': 'jouw_wachtwoord',
'host': 'localhost',
'database': 'jouw_database'
}
# Lees het CSV-bestand
df = pd.read_csv('gegevens.csv')
# Data transformaties (voorbeeld)
# Bijvoorbeeld, datums naar datetime objecten
if 'datum' in df.columns:
df['datum'] = pd.to_datetime(df['datum'], errors='coerce')
# Conversie van numerieke velden, met foutafhandeling
if 'bedrag' in df.columns:
df['bedrag'] = pd.to_numeric(df['bedrag'], errors='coerce')
# Gegevens controleren op null-waarden en verwijderen indien nodig
df.dropna(inplace=True)
# Verbinding maken met de database
try:
cnx = mysql.connector.connect(**db_config)
cursor = cnx.cursor()
# Voorbeeld: tabelstructuur
# CREATE TABLE gegevens (
# id INT AUTO_INCREMENT PRIMARY KEY,
# naam VARCHAR(100),
# datum DATE,
# bedrag DECIMAL(10,2)
# );
# Insert statements voorbereiden
insert_stmt = (
"INSERT INTO gegevens (naam, datum, bedrag) "
"VALUES (%s, %s, %s)"
)
# Data laden
for index, row in df.iterrows():
cursor.execute(insert_stmt, (
row['naam'],
row['datum'].date() if pd.notnull(row['datum']) else None,
row['bedrag']
))
# Committen
cnx.commit()
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Gebruikersnaam of wachtwoord verkeerd")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database bestaat niet")
else:
print(err)
finally:
cursor.close()
cnx.close()
```
**Belangrijk:**
- Pas de `db_config` aan met jouw databasegegevens.
- Controleer of de tabel `gegevens` bestaat en de juiste structuur heeft.
- Voeg extra transformaties toe afhankelijk van je specifieke gegevens en formaten.
- Voor grote datasets kun je batching gebruiken voor efficiënter laden.
Wil je dat ik het script uitbreid of aanpas op basis van specifieke kolommen of formaten?
provider: deepseek
model: deepseek-chat
Hier is een Python-script voor het transformeren en laden van gegevens van CSV naar MySQL:
```python
import pandas as pd
import mysql.connector
from mysql.connector import Error
import logging
from datetime import datetime
# Logging instellen
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class DataTransformer:
def __init__(self, csv_bestand, db_config):
self.csv_bestand = csv_bestand
self.db_config = db_config
self.dataframe = None
def lees_csv(self):
"""Leest het CSV-bestand in met foutafhandeling"""
try:
self.dataframe = pd.read_csv(
self.csv_bestand,
encoding='utf-8',
sep=',',
na_values=['', 'NULL', 'null', 'NaN'],
keep_default_na=True
)
logging.info(f"CSV-bestand succesvol ingelezen: {len(self.dataframe)} rijen gevonden")
return True
except FileNotFoundError:
logging.error(f"Bestand niet gevonden: {self.csv_bestand}")
return False
except Exception as e:
logging.error(f"Fout bij het lezen van CSV: {str(e)}")
return False
def transformeer_gegevens(self):
"""Voert gegevenstransformaties uit"""
if self.dataframe is None:
logging.error("Geen data beschikbaar voor transformatie")
return False
try:
# Verwijder dubbele rijen
oorspronkelijk_aantal = len(self.dataframe)
self.dataframe = self.dataframe.drop_duplicates()
logging.info(f"Duplicaten verwijderd: {oorspronkelijk_aantal - len(self.dataframe)} rijen")
# Verwerk ontbrekende waarden
self._verwerk_ontbrekende_waarden()
# Data type conversies
self._converteer_data_types()
# Valideer gegevens
self._valideer_gegevens()
logging.info("Gegevenstransformaties succesvol voltooid")
return True
except Exception as e:
logging.error(f"Fout tijdens transformatie: {str(e)}")
return False
def _verwerk_ontbrekende_waarden(self):
"""Handelt ontbrekende waarden af"""
for kolom in self.dataframe.columns:
if self.dataframe[kolom].isnull().sum() > 0:
if self.dataframe[kolom].dtype in ['int64', 'float64']:
# Voor numerieke kolommen: vul aan met mediaan
self.dataframe[kolom].fillna(self.dataframe[kolom].median(), inplace=True)
elif self.dataframe[kolom].dtype == 'object':
# Voor tekstkolommen: vul aan met 'Onbekend'
self.dataframe[kolom].fillna('Onbekend', inplace=True)
logging.info(f"Ontbrekende waarden verwerkt in kolom: {kolom}")
def _converteer_data_types(self):
"""Converteert data types naar geschikte MySQL types"""
type_mapping = {
'int64': 'INT',
'float64': 'DECIMAL(10,2)',
'object': 'VARCHAR(255)',
'bool': 'BOOLEAN',
'datetime64[ns]': 'DATETIME'
}
for kolom in self.dataframe.columns:
# Converteer datum/tijd kolommen
if 'datum' in kolom.lower() or 'date' in kolom.lower():
try:
self.dataframe[kolom] = pd.to_datetime(self.dataframe[kolom], errors='coerce')
except:
pass
# Converteer numerieke kolommen
if self.dataframe[kolom].dtype == 'object':
try:
self.dataframe[kolom] = pd.to_numeric(self.dataframe[kolom], errors='ignore')
except:
pass
def _valideer_gegevens(self):
"""Voert basisgegevensvalidatie uit"""
# Controleer op negatieve waarden in numerieke kolommen
numerieke_kolommen = self.dataframe.select_dtypes(include=['int64', 'float64']).columns
for kolom in numerieke_kolommen:
if (self.dataframe[kolom] < 0).any():
logging.warning(f"Negatieve waarden gevonden in kolom: {kolom}")
logging.info("Gegevensvalidatie voltooid")
def maak_database_verbinding(self):
"""Maakt verbinding met MySQL database"""
try:
verbinding = mysql.connector.connect(**self.db_config)
logging.info("Verbinding met database succesvol")
return verbinding
except Error as e:
logging.error(f"Fout bij databaseverbinding: {str(e)}")
return None
def laad_naar_database(self, tabel_naam):
"""Laadt getransformeerde gegevens naar MySQL"""
if self.dataframe is None:
logging.error("Geen data beschikbaar voor laden")
return False
verbinding = self.maak_database_verbinding()
if not verbinding:
return False
try:
cursor = verbinding.cursor()
# Maak tabel aan als deze niet bestaat
self._maak_tabel(cursor, tabel_naam)
# Bereid INSERT query voor
plaatshouders = ', '.join(['%s'] * len(self.dataframe.columns))
kolommen = ', '.join(self.dataframe.columns)
insert_query = f"INSERT INTO {tabel_naam} ({kolommen}) VALUES ({plaatshouders})"
# Voer batch insert uit
data_tuples = [tuple(x) for x in self.dataframe.to_numpy()]
cursor.executemany(insert_query, data_tuples)
verbinding.commit()
logging.info(f"Succesvol {len(data_tuples)} rijen geladen in tabel: {tabel_naam}")
return True
except Error as e:
logging.error(f"Fout bij het laden naar database: {str(e)}")
verbinding.rollback()
return False
finally:
if verbinding.is_connected():
cursor.close()
verbinding.close()
def _maak_tabel(self, cursor, tabel_naam):
"""Creëert de doeltabel als deze niet bestaat"""
kolom_definities = []
for kolom in self.dataframe.columns:
dtype = str(self.dataframe[kolom].dtype)
if dtype in ['int64']:
mysql_type = 'INT'
elif dtype in ['float64']:
mysql_type = 'DECIMAL(10,2)'
elif 'datetime' in dtype:
mysql_type = 'DATETIME'
else:
mysql_type = 'VARCHAR(255)'
kolom_definities.append(f"`{kolom}` {mysql_type}")
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_naam} (
id INT AUTO_INCREMENT PRIMARY KEY,
{', '.join(kolom_definities)},
geladen_op TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_query)
# Hoofdprogramma
def main():
# Database configuratie
db_config = {
'host': 'localhost',
'database': 'jouw_database_naam',
'user': 'jouw_gebruikersnaam',
'password': 'jouw_wachtwoord',
'port': 3306
}
# Initialiseer transformer
transformer = DataTransformer('gegevens.csv', db_config)
# Uitvoeren ETL proces
if transformer.lees_csv():
if transformer.transformeergegevens():
if transformer.laad_naar_database('getransformeerde_gegevens'):
logging.info("ETL proces succesvol voltooid!")
else:
logging.error("Fout bij het laden naar database")
else:
logging.error("Fout tijdens gegevenstransformatie")
else:
logging.error("Fout bij het inlezen van CSV-bestand")
if __name__ == "__main__":
main()
```
**Installatievereisten:**
```bash
pip install pandas mysql-connector-python
```
**Belangrijke kenmerken:**
1. **Foutafhandeling**: Uitgebreide logging en exception handling
2. **Gegevenstransformatie**:
- Verwijderen van duplicaten
- Afhandeling ontbrekende waarden
- Data type conversies
- Basisvalidatie
3. **Database integratie**:
- Automatisch aanmaken van tabellen
- Batch inserts voor prestaties
- Transactiebeheer
4. **Flexibiliteit**: Makkelijk aan te passen voor specifieke vereisten
**Gebruik:**
1. Pas de databaseconfiguratie aan
2. Zorg dat `gegevens.csv` in dezelfde map staat
3. Voer het script uit: `python script_naam.py`
Het script is modulair opgebouwd en eenvoudig uit te breiden met aanvullende transformaties of validaties.

