diff mbox series

[v2] hashserv: Postgres adaptations for ignoring duplicate inserts

Message ID 20240209091443.2737892-1-tobiasha@axis.com
State New
Headers show
Series [v2] hashserv: Postgres adaptations for ignoring duplicate inserts | expand

Commit Message

Tobias Hagelborn Feb. 9, 2024, 9:14 a.m. UTC
Hash Equivalence server performs unconditional insert also of duplicate
hash entries. This causes excessive error log entries in Postgres.
Rather ignore the duplicate inserts.

The alternate behavior should be isolated to the postgres
engine type.

Signed-off-by: Tobias Hagelborn <tobiasha@axis.com>
---

Updated return codes based on rows updated according to review comments.

 lib/hashserv/sqlalchemy.py | 41 ++++++++++++++++++++++++++++----------
 1 file changed, 31 insertions(+), 10 deletions(-)
diff mbox series

Patch

diff --git a/lib/hashserv/sqlalchemy.py b/lib/hashserv/sqlalchemy.py
index cee04bff..b17a1762 100644
--- a/lib/hashserv/sqlalchemy.py
+++ b/lib/hashserv/sqlalchemy.py
@@ -32,6 +32,7 @@  from sqlalchemy import (
 import sqlalchemy.engine
 from sqlalchemy.orm import declarative_base
 from sqlalchemy.exc import IntegrityError
+from sqlalchemy.dialects.postgresql import insert as postgres_insert
 
 Base = declarative_base()
 
@@ -287,16 +288,28 @@  class Database(object):
             return result.rowcount
 
     async def insert_unihash(self, method, taskhash, unihash):
-        statement = insert(UnihashesV2).values(
-            method=method,
-            taskhash=taskhash,
-            unihash=unihash,
-        )
+        # Postgres specific ignore on insert duplicate
+        if self.engine.name == 'postgresql':
+            statement = postgres_insert(UnihashesV2).values(
+                method=method,
+                taskhash=taskhash,
+                unihash=unihash,
+            )
+            statement = statement.on_conflict_do_nothing(
+                index_elements=("method", "taskhash")
+            )
+        else:
+            statement = insert(UnihashesV2).values(
+                method=method,
+                taskhash=taskhash,
+                unihash=unihash,
+            )
+
         self.logger.debug("%s", statement)
         try:
             async with self.db.begin():
-                await self.db.execute(statement)
-            return True
+                result = await self.db.execute(statement)
+                return result.rowcount != 0
         except IntegrityError:
             self.logger.debug(
                 "%s, %s, %s already in unihash database", method, taskhash, unihash
@@ -311,12 +324,20 @@  class Database(object):
         if "created" in data and not isinstance(data["created"], datetime):
             data["created"] = datetime.fromisoformat(data["created"])
 
-        statement = insert(OuthashesV2).values(**data)
+        # Postgres specific ignore on insert duplicate
+        if self.engine.name == 'postgresql':
+            statement = postgres_insert(OuthashesV2).values(**data)
+            statement = statement.on_conflict_do_nothing(
+                index_elements=("method", "taskhash", "outhash")
+            )
+        else:
+            statement = insert(OuthashesV2).values(**data)
+
         self.logger.debug("%s", statement)
         try:
             async with self.db.begin():
-                await self.db.execute(statement)
-            return True
+                result = await self.db.execute(statement)
+                return result.rowcount != 0
         except IntegrityError:
             self.logger.debug(
                 "%s, %s already in outhash database", data["method"], data["outhash"]