programing

중복된 기본 키에서 Pandas to_sql이 실패함

megabox 2023. 10. 26. 20:56
반응형

중복된 기본 키에서 Pandas to_sql이 실패함

팬더를 이용해 기존 테이블에 추가하고 싶습니다.df.to_sql()기능.

설정했습니다if_exists='append', 하지만 내 테이블에는 기본 열쇠가 있습니다.

저는 다음과 같은 작업을 하고 싶습니다.insert ignore하려고 할 때는append기존 테이블에 중복 입력 오류가 발생하지 않도록 하겠습니다.

팬더로 가능한가요, 아니면 명시적인 쿼리를 작성해야 하나요?

안타깝게도 "INSERT GRANGE"를 지정할 수 있는 옵션이 없습니다.이렇게 해서 중복되지 않는 행을 데이터베이스에 삽입할 수 있게 되었습니다(데이터프레임 이름은 df).

for i in range(len(df)):
    try:
        df.iloc[i:i+1].to_sql(name="Table_Name",if_exists='append',con = Engine)
    except IntegrityError:
        pass #or any other action

당신은 이것을 할 수 있습니다.method의 매개 변수to_sql:

from sqlalchemy.dialects.mysql import insert

def insert_on_duplicate(table, conn, keys, data_iter):
    insert_stmt = insert(table.table).values(list(data_iter))
    on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(insert_stmt.inserted)
    conn.execute(on_duplicate_key_stmt)

df.to_sql('trades', dbConnection, if_exists='append', chunksize=4096, method=insert_on_duplicate)

이전 버전의 sqalchemy의 경우, 당신은 a를 통과해야 합니다.dict.on_duplicate_key_update,on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(dict(insert_stmt.inserted))

참고하시기 바랍니다."if_exists='append'"표의 존재 및 가 존재하지 않을 경우 수행할 작업과 관련됩니다.if_exists는 테이블의 내용과 관련이 없습니다.여기에서 문서를 참조하십시오. https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_sql.html

if_exists: {'fail', 'replace', 'append', 기본 'fail' 실패:테이블이 있으면 아무것도 하지 않습니다.바꾸기:테이블이 존재하는 경우 해당 테이블을 떨어뜨리고 재생성한 후 데이터를 삽입합니다. append:표가 존재하는 경우 데이터를 삽입합니다.존재하지 않는 경우 작성합니다.

팬더는 현재로서는 선택의 여지가 없지만, 여기에 Github 문제가 있습니다.당신도 이 기능이 필요하다면, 그냥 찬성표를 던지세요.

위의 for loop 방법은 상황을 상당히 느리게 만듭니다.sql 쿼리에 대한 사용자 지정을 위해 panda.to _sql에 전달할 수 있는 메서드 매개 변수가 있습니다.

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html#pandas.DataFrame.to_sql

아래 코드는 postgres에 대해 작동해야 하며 기본 키 "unique_code"와 충돌이 발생할 경우 아무것도 수행하지 않습니다.db의 삽입 방언을 변경합니다.

def insert_do_nothing_on_conflicts(sqltable, conn, keys, data_iter):
    """
    Execute SQL statement inserting data

    Parameters
    ----------
    sqltable : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    from sqlalchemy.dialects.postgresql import insert
    from sqlalchemy import table, column
    columns=[]
    for c in keys:
        columns.append(column(c))

    if sqltable.schema:
        table_name = '{}.{}'.format(sqltable.schema, sqltable.name)
    else:
        table_name = sqltable.name

    mytable = table(table_name, *columns)

    insert_stmt = insert(mytable).values(list(data_iter))
    do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['unique_code'])

    conn.execute(do_nothing_stmt)

pd.to_sql('mytable', con=sql_engine, method=insert_do_nothing_on_conflicts)

Pandas는 .to_sql 메서드의 실제 SQL 구문 편집을 지원하지 않으므로 운이 없을 수 있습니다.몇 가지 실험적인 프로그래밍 방식의 해결 방법이 있습니다. 예를 들어 데이터프레임을 다음과 같이 SQLLchemy 객체로 읽어 보십시오.CALCHIPAN트랜잭션에 대해서는 SQLLchemy를 사용합니다. 하지만 데이터프레임을 CSV에 기록하고 명시적인 MySQL 함수를 로드하면 더 나은 서비스를 제공받을 수 있습니다.

CALCHIPAN repo: https://bitbucket.org/zzzeek/calchipan/

무결성 오류가 계속 발생하는 데 문제가 있었습니다.

...이상하지만 전 그냥 위의 내용을 받아들여서 거꾸로 작업했습니다.

for i, row in df.iterrows():
    sql = "SELECT * FROM `Table_Name` WHERE `key` = '{}'".format(row.Key)
    found = pd.read_sql(sql, con=Engine)
    if len(found) == 0:
        df.iloc[i:i+1].to_sql(name="Table_Name",if_exists='append',con = Engine)

제 경우에는 빈 테이블에 새 데이터를 넣으려고 했지만, 일부 행이 중복되어 있고, 여기서도 거의 동일한 문제가 있습니다. 기존 데이터를 가져와 새로 얻은 데이터와 병합하여 처리하는 것을 "생각할 수도 있지만" 이는 최적이 아니며, 큰 테이블이 아니라 작은 데이터에만 작동할 수도 있습니다.

팬더는 지금 이 상황에 대해 어떤 종류의 대처 방법도 제공하지 않기 때문에, 저는 이에 대한 적절한 해결책을 찾고 있었기 때문에, 그것이 당신에게 효과가 있을지 없을지 확신할 수 없지만, 저는 그것이 효과가 있는지 아닌지를 기다리는 행운 대신에 먼저 데이터를 통제하기로 결정했습니다. 그래서 제가 한 일은 제가 전화하기 전에 중복을 제거하는 것입니다..to_sql따라서 오류가 발생할 경우 데이터에 대한 자세한 정보를 제공하고 현재 상황을 파악할 수 있습니다.

import pandas as pd


def write_to_table(table_name, data):
    df = pd.DataFrame(data)
    # Sort by price, so we remove the duplicates after keeping the lowest only
    data.sort(key=lambda row: row['price'])
    df.drop_duplicates(subset=['id_key'], keep='first', inplace=True)
    #
    df.to_sql(table_name, engine, index=False, if_exists='append', schema='public')

그래서 제 경우에는 가장 낮은 가격의 행을 유지하고 싶었습니다(btw 저는 여러 개의 행을 통과했습니다).dict위해서data), 그리고 이를 위해 먼저 정렬을 했습니다. 꼭 필요한 것은 아니지만 이것은 제가 보관하고자 하는 데이터를 제어하는 것에 대한 예시입니다.

이것이 저의 상황과 거의 비슷한 사람에게 도움이 되었으면 좋겠습니다.

SQL Server를 사용할 때 기본 키 제약 조건이 있는 테이블에 중복 값을 입력하면 SQL 오류가 발생합니다.테이블을 변경하여 수정할 수 있습니다.

CREATE TABLE [dbo].[DeleteMe](
[id] [uniqueidentifier] NOT NULL,
[Value] [varchar](max) NULL,
CONSTRAINT [PK_DeleteMe] 
PRIMARY KEY ([id] ASC) 
WITH (IGNORE_DUP_KEY = ON)); <-- add

https://dba.stackexchange.com/a/111771 에서 가져온 것입니다.

이제 당신의df.to_sql()다시 작동해야 합니다.

JayenHuy Tran의 해결책들은 저에게 많은 도움을 주었지만, 그것들은 바로 실행되지 않았습니다.제가 Jayen 코드에 직면한 문제는 DataFrame 열이 데이터베이스의 열과 정확히 일치해야 한다는 것입니다.데이터베이스에 기록하지 않을 DataFrame 열이 일부 있었기 때문에 제 경우에는 그렇지 않았습니다.
열 이름을 고려하도록 솔루션을 수정했습니다.

from sqlalchemy.dialects.mysql import insert
import itertools

def insertWithConflicts(sqltable, conn, keys, data_iter):
    """
    Execute SQL statement inserting data, whilst taking care of conflicts
    Used to handle duplicate key errors during database population
    This is my modification of the code snippet 
    from https://stackoverflow.com/questions/30337394/pandas-to-sql-fails-on-duplicate-primary-key

    The help page from https://docs.sqlalchemy.org/en/14/core/dml.html#sqlalchemy.sql.expression.Insert.values
    proved useful.
    
    Parameters
    ----------
    sqltable : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted. It is a zip object.
                The length of it is equal to the chunck size passed in df_to_sql()
    """
    vals = [dict(zip(z[0],z[1])) for z in zip(itertools.cycle([keys]),data_iter)] 
    insertStmt = insert(sqltable.table).values(vals)
    doNothingStmt = insertStmt.on_duplicate_key_update(dict(insertStmt.inserted))
    conn.execute(doNothingStmt)

저는 같은 문제에 직면했고 제 테이블에 스키마가 생기기 시작할 때까지 @Huy Trans에서 제공하는 솔루션을 한동안 채택했습니다.저는 그의 답변을 좀 개선해야 했고 이것이 최종 결과입니다.

def do_nothing_on_conflicts(sql_table, conn, keys, data_iter):
"""
Execute SQL statement inserting data

Parameters
----------
sql_table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
    Column names
data_iter : Iterable that iterates the values to be inserted
"""
columns = []
for c in keys:
    columns.append(column(c))

if sql_table.schema:
    my_table = table(sql_table.name, *columns, schema=sql_table.schema)
    # table_name = '{}.{}'.format(sql_table.schema, sql_table.name)
else:
    my_table = table(sql_table.name, *columns)
    # table_name = sql_table.name

# my_table = table(table_name, *columns)

insert_stmt = insert(my_table).values(list(data_iter))
do_nothing_stmt = insert_stmt.on_conflict_do_nothing()

conn.execute(do_nothing_stmt)

사용방법:

history.to_sql('history', schema=schema, con=engine, method=do_nothing_on_conflicts)

이 아이디어는 @Nfern과 동일하지만 재귀 함수를 사용하여 각 반복에서 df를 절반으로 분할하여 무결성 위반을 일으키는 행/행을 건너뜁니다.

        def insert(df):

          try:
             # inserting into backup table
             df.to_sql("table",con=engine, if_exists='append',index=False,schema='schema') 
         except:
            rows = df.shape[0]
            if rows>1:
                df1 = df.iloc[:int(rows/2),:]
                df2 = df.iloc[int(rows/2):,:]
                insert(df1)
                insert(df2)
            else:
                print(f"{df} not inserted. Integrity violation, duplicate primary key/s")

@Jayen과 동일하지만 postgresql의 경우이며 충돌 논리에 대해 아무것도 수행하지 않습니다(sqalchemy 문서 참조).

from sqlalchemy.dialects.postgresql import insert

def insert_or_do_nothing_on_conflict(table, conn, keys, data_iter):
        insert_stmt = insert(table.table).values(list(data_iter))
        # you need to specify column(s) name(s) used to infer unique index
        on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['column_index1', 'column_index2'])
        conn.execute(on_duplicate_key_stmt)


df.to_sql(
    name="table_name",
    schema="schema_name",
    con=engine,
    if_exists="append",
    index=False,
    method=insert_or_do_nothing_on_conflict
)

저는 이미 존재하는 ID를 명시적으로 검색하여 각각을 별도의 함수로 업데이트하거나 T2 테이블의 모든 데이터를 포함하는 하나의 데이터 프레임과 T1 테이블의 모든 데이터를 포함하는 다른 테이블을 얻을 수 있습니다. 그리고 당신은 당신의 일치하는 열이 같지 않을 때 ID에 가입하고 당신의 업데이트 문을 수행합니다.

언급URL : https://stackoverflow.com/questions/30337394/pandas-to-sql-fails-on-duplicate-primary-key

반응형