Sonntag, 7. November 2010

Werden parallele WorkItems in DROOLS (jBPM) wirklich parallel ausgeführt?

Ein einfacher Test zeigte, dass selbst parallele Workitems in einem Workflow nicht parallel ausgeführt werden, da DROOLS intern nur einen Ausführungsthread benutzt. Also muss innerhalb eines WorkItemHandlers die Logik in einen eigenen Thread ausgelagert werden damit der Aufruf von executeWorkitem sofort an die Prozessengine zurückkehrt.

Die Klasse StatefulKnowledgeSessionWrapper ist im Blogpost "Ist ein Drools Workflow theradsafe?" beschrieben, die globale Variable "Ctx" beinhaltet Hilfvariablen.

public class SucheWorkItemHandler implements WorkItemHandler {

  private ExecutorService executor;
  private StatefulKnowledgeSessionWrapper ksession;


  public SucheConcentWorkItemHandler(
    ExecutorService executor, 
    StatefulKnowledgeSessionWrapper k) {

        this.executor = executor;
        this.ksession = k;
  }


  @Override
  public void abortWorkItem(WorkItem workItem,
                            WorkItemManager workItemManager) {
        // 
  }

  @Override
  public void executeWorkItem(WorkItem workItem,
                              WorkItemManager workItemManager) {

     ProductCtx ctx = (ProductCtx)
         ksession.getSession().getGlobal("Ctx");
     executor.submit(new SucheImpl(ctx, workItem));
  }
}

Die Klasse SucheImpl enthält die Implementierung des Workitems als Callable. Wenn der Bearbeitungsthread im WorkItem fertig ist, muss er die Methode completeWorkItem des StatefulKnowledgeSessionWrapper der Engine mitteilen, dass der Prozessschritt beendet ist.

Mittels dieser Methode  ist eine echte Parallelisierung des Drools Workflow möglich.

Ist ein Drools Workflow theradsafe?

Bei der Implementation eines Wokflows mit Drools (jBPM) entstand das Problem, dass es zu zufälligen Deadlocks im Workflow kam. Diese traten selten und unregelmäßig auf. Es kam schnell der Verdacht auf, dass es durch Konflikte zwischen verschiedenen Threads zu diesem Verhalten kam.

Also musste der Workflow durch einen synchronisierten Wrapper threadsafe gemacht werden:

public class DroolsWrapper {

  private KnowledgeBase kbase;


  public DroolsWrapper() throws FctyException {
    // Knowledegebase laden
    kbase = .....
  }

  public StatefulKnowledgeSessionWrapper newStatefulKnowledgeSession() {
    return new StatefulKnowledgeSessionWrapper();
  }

  public class StatefulKnowledgeSessionWrapper {

    private StatefulKnowledgeSession kSession;

    public StatefulKnowledgeSessionWrapper(KnowledgeBase kbase) {
      kSession = kbase.newStatefulKnowledgeSession();
    }

    public synchronized void dispose() {
      kSession.dispose();
    }

    public synchronized WorkItemManager getWorkItemManager() {
      return kSession.getWorkItemManager();
    }

    public synchronized void setGlobal(String string, Object obj) {
      kSession.setGlobal(string, obj);
    }

    public synchronized ProcessInstance startProcess(String string) {
      return kSession.startProcess(string);
    }

    public synchronized FactHandle insert(Object obj) {
      logger.debug(obj.toString());
      return kSession.insert(obj);
    }

    public synchronized int fireAllRules() {
      return kSession.fireAllRules();
    }

    public synchronized QueryResults getQueryResults(String string) {
      return kSession.getQueryResults(string);
    }

    public synchronized Object getGlobal(String string) {
      return kSession.getGlobal(string);
    }

    public synchronized void completeWorkItem(WorkItem workItem, Map results) {
       kSession.getWorkItemManager().completeWorkItem(workItem.getId(), results);
    }

    public synchronized void abortWorkItem(WorkItem workItem) {
      kSession.getWorkItemManager().abortWorkItem(workItem.getId());
    }

    public synchronized int getPrcState(ProcessInstance prc) {
      return prc.getState();
    }

    public synchronized void completeWorkItem(WorkItem workItem, Exception e) {
      // complete with a Exception
      Map results = new HashMap();
      results.put("Exception", e);
      kSession.getWorkItemManager().completeWorkItem(workItem.getId(), results);
    }
  }
} 


Alle Aufrufe zu Knowledgesession und Prozess müssen durch diesen Wrapper gehen. Damit traten keine weiteren Fehler mehr auf.

Implementation java.util.Preferences mit JDBC-Backend

Das Java Interface java.util.Preferences bietet eine Schnittstelle, um Einstellungen in hierarchisch gegliederter Form zu speichern, aber leider keine brauchbare Implementation dieses Interfaces. Oft besteht auch die Anforderung, Einstellungen in einer Datenbank zu speichern. Genau für diesen Zweck kann die folgende Impelementierung eingesetzt werden. http://code.google.com/p/java-util-prefs-jdbc-backend/.

Freitag, 5. November 2010

Ein alternativer Weg, unter Oracle Tabellen im laufenden Betrieb neu aufzubauen

Manchmal müssen grosse Datenbestände nach bestimmten Regeln modifizert werden. Wenn die Datenmenge viele Millionen Records umfasst, so ist eine Änderung mit UPDATE Statements nicht mehr praktikabel. Ein effektiver Weg besteht darin, die Tabelle unter einem neuen Namen neu aufzubauen. Hierzu kann das "CREATE TABLE AS" Statement - auch CTAS genannt - genutzt werden. Ist die neue Tabelle fertig, so wird die Arbeitstabelle entfernt oder umbenannt. Die neue Tabelle nimmt dann den Platz der alten Tabelle ein.

-- Die neue Tabelle als temporäre Tabelle aufbauen.
-- Das Statement kann natürlich auch die Daten verändern.
CREATE TABLE temptable AS
  SELECT * FROM worktable;
  
-- Jetzt werden auf "temptable" die Indexes u.ä. angelegt

-- Die Arbeitstabelle als Backup sichern
ALTER TABLE worktable RENAME TO backup;

-- Die neu aufgebaute Tabelle aktivieren
ALTER TABLE temptable RENAME TO worktable;
  
-- hat alles geklappt, so wird das Backup gelöscht 
DROP TABLE backup;
Der eben aufgezeigte Weg ist eine einfache Methode, umfangreiche Datenbestände zu überarbeiten. Ein Nachteil ist, dass während der Umstellung keine Abfragen auf dieser Tabelle möglich sind. Auch Packages, die sich auf diese Tabelle beziehen, werden invalid.
Ist solch eine Modifikation im laufenden Betrieb nötig, so müssen andere Wege benutzt werden. Seit Oracle 9i existiert das "online table redefinition" Feature.
Eine Einschränkung ist es jedoch, dass die modifizierten Tabellen einen PK besitzen müssen, der nicht verändert werden kann. Sollen z.B. doppelte Zeilen zusammengefasst werden und sind vieleicht sogar noch weitere, abhängige Tabellen beteiligt, so stösst dieses Verfahren an seine Grenzen.
Hier bietet das "ALTER TABLE ... EXCHANGE PARTITION" Feature einen eleganten Ausweg. Dieses Feature ist zur Wartung partitionierter Tabellen gedacht und tauscht die Daten- und Indexsegmente zwischen einer Tabellenpartition und einer normalen Tabelle. Dieser Tausch kann während des normalen Betriebs durchgeführt werden, da keine Objekte in der Datenbank ungültig werden.

Wie kann nun dieser Mechanismus dabei helfen, eine normale Tabelle zu modifizieren? Die Hilfstabelle muss eine partitionierte Tabelle sein!

-- Die neue Tabelle als temporäre Tabelle aufbauen. 
-- Das Statement kann natürlich auch die Daten verändern.
CREATE TABLE temptable
  PARTITION BY RANGE(col1) 
  (partition P01 values less than ('99999999999999999') tablespace ) AS
   SELECT * FROM worktable;
   
-- Jetzt werden auf "temptable" die gleichen Indexes wie auf "worktable" 
-- als LOCAL angelegt

-- Nun tauschen die Tabellen ihren Inhalt
ALTER TABLE temptable EXCHANGE PARTITION P01
  WITH TABLE worktable INCLUDING INDEXES WITHOUT VALIDATION;

Natürlich dürfen während des Vorganges keine INSERT oder UPDATE Vorgänge auf der Arbeitstabelle stattfinden. Abfragen sind jedoch uneingeschränkt möglich und es werden auch keine abhängigen Objekte ungültig.

Ermittlung des Fortschritts eines INSERT-, UPDATE- oder ROLLBACK-Vorganges

Ermittlung des Fortschritts eines INSERT-, UPDATE- oder ROLLBACK-Vorganges

Der Fortschritt vieler SELECT Schritte kann in der Tabelle v$session_longops nachvollzogen werden. Bei INSERT-, UPDATE- oder ROLLBACK- Vorgänge ist der Fortschritt etwas schwieriger zu ermitteln, doch auch das ist möglich:
SELECT vs.sid,
       vs.serial#,
       vs.username,
       vs.status,
       vs.schemaname,
       vs.osuser,
       vs.machine,
       vs.program,
       vt.used_ublk as "UNDO-Blöcke",
       vt.used_urec as "UNDO-Rows"
  FROM v$session vs, v$transaction vt, v$sqlarea a
 WHERE vs.taddr = vt.addr
   AND a.hash_value(+) = decode(vs.sql_hash_value, 0,
       vs.prev_hash_value,
       vs.sql_hash_value)
   AND vs.sid = 29 <-- SID anpassen !!!

Das Ergebnis sieht wie das folgende Beispiel aus:

SIDSERIAL#USERNAMESTATUSSCHEMANAMEOSUSERMACHINEPROGRAMUNDO-BlöckeUNDO-Rows
2935108scott120


Dabei bedeutet die Spalte UNDO-Blöcke die Menge der UNDO-Blöcke, die von der Transaktion reserviert sind. Die Spalte UNDO-Rows zeigt die Anzahl der modifizierten Zeilen. Wenn eine Transaktion während des ROLLBACK Vorganges beobachtet wird, so fallen diese Werte. Wenn sie Null ereichen, so ist die Transaktion beendet.

Ermittlung des aktuellen Ausführungsplans eines SQL Statements

Wenn ein SQL Statement plötzlich eine lange Laufzeit hat, so muss der Ausführungsplan dieses Statements geprüft werden. Dazu ist die Session-ID nötig. Ist diese ermittelt, so kann mit dem folgenden Statement der Ausführungsplan angezeigt werden:


SELECT lpad(' ', 4 * LEVEL) || simple_plan simple_plan,
    object_name,
    trunc(bytes /1024/1024,2) AS size_mb,
    cost,
    cardinality
  FROM (SELECT p.id,
               p.parent_id,
               p.child_number,
               object_name,
               operation || ' (' || options || ')' simple_plan,
               p.bytes,
               p.cost,
               p.cardinality
          FROM v$sql_plan p, v$session s
         WHERE p.address = hextoraw(s.sql_address) AND
               p.hash_value = s.sql_hash_value AND
               s.sid = 29)  <-- SID anpassen !!!
 START WITH id = 0
CONNECT BY parent_id = PRIOR id AND
           child_number = PRIOR child_number
 ORDER BY child_number, id


Das Ergebnis sieht wie das folgende Beispiel aus:

SIMPLE_PLANOBJECT_NAMESIZE_MBCOSTCARDINALITY
SELECT STATEMENT ()
1095459
TABLE ACCESS (BY INDEX ROWID)
LARGE_TABLE39,8110954591346683
INDEX (RANGE SCAN)
PK_LARGE_TABLE15558334127055