diff mbox series

[bitbake-devel,v2,8/8] bitbake: hashserv: Postgres adaptations for ignoring duplicate inserts

Message ID 20240218225953.2997239-9-JPEWhacker@gmail.com
State New
Headers show
Series Implement parallel Query API | expand

Commit Message

Joshua Watt Feb. 18, 2024, 10:59 p.m. UTC
From: Tobias Hagelborn <tobias.hagelborn@axis.com>

Hash Equivalence server performs unconditional insert also of duplicate
hash entries. This causes excessive error log entries in Postgres.
Rather ignore the duplicate inserts.

The alternate behavior should be isolated to the postgres
engine type.

Signed-off-by: Tobias Hagelborn <tobiasha@axis.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 bitbake/lib/hashserv/sqlalchemy.py | 53 +++++++++++++++++++++---------
 1 file changed, 38 insertions(+), 15 deletions(-)
diff mbox series

Patch

diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py
index 0e28d738f5a..fc3ae3d3396 100644
--- a/bitbake/lib/hashserv/sqlalchemy.py
+++ b/bitbake/lib/hashserv/sqlalchemy.py
@@ -33,6 +33,7 @@  from sqlalchemy import (
 import sqlalchemy.engine
 from sqlalchemy.orm import declarative_base
 from sqlalchemy.exc import IntegrityError
+from sqlalchemy.dialects.postgresql import insert as postgres_insert
 
 Base = declarative_base()
 
@@ -283,9 +284,7 @@  class Database(object):
     async def unihash_exists(self, unihash):
         async with self.db.begin():
             result = await self._execute(
-                select(UnihashesV3)
-                .where(UnihashesV3.unihash == unihash)
-                .limit(1)
+                select(UnihashesV3).where(UnihashesV3.unihash == unihash).limit(1)
             )
 
             return result.first() is not None
@@ -435,18 +434,30 @@  class Database(object):
             return result.rowcount
 
     async def insert_unihash(self, method, taskhash, unihash):
-        try:
-            async with self.db.begin():
-                await self._execute(
-                    insert(UnihashesV3).values(
-                        method=method,
-                        taskhash=taskhash,
-                        unihash=unihash,
-                        gc_mark=self._get_config_subquery("gc-mark", ""),
-                    )
+        # Postgres specific ignore on insert duplicate
+        if self.engine.name == "postgresql":
+            statement = (
+                postgres_insert(UnihashesV3)
+                .values(
+                    method=method,
+                    taskhash=taskhash,
+                    unihash=unihash,
+                    gc_mark=self._get_config_subquery("gc-mark", ""),
                 )
+                .on_conflict_do_nothing(index_elements=("method", "taskhash"))
+            )
+        else:
+            statement = insert(UnihashesV3).values(
+                method=method,
+                taskhash=taskhash,
+                unihash=unihash,
+                gc_mark=self._get_config_subquery("gc-mark", ""),
+            )
 
-            return True
+        try:
+            async with self.db.begin():
+                result = await self._execute(statement)
+                return result.rowcount != 0
         except IntegrityError:
             self.logger.debug(
                 "%s, %s, %s already in unihash database", method, taskhash, unihash
@@ -461,10 +472,22 @@  class Database(object):
         if "created" in data and not isinstance(data["created"], datetime):
             data["created"] = datetime.fromisoformat(data["created"])
 
+        # Postgres specific ignore on insert duplicate
+        if self.engine.name == "postgresql":
+            statement = (
+                postgres_insert(OuthashesV2)
+                .values(**data)
+                .on_conflict_do_nothing(
+                    index_elements=("method", "taskhash", "outhash")
+                )
+            )
+        else:
+            statement = insert(OuthashesV2).values(**data)
+
         try:
             async with self.db.begin():
-                await self._execute(insert(OuthashesV2).values(**data))
-            return True
+                result = await self._execute(statement)
+                return result.rowcount != 0
         except IntegrityError:
             self.logger.debug(
                 "%s, %s already in outhash database", data["method"], data["outhash"]