Repositories

Note!!!

I have changed my mind about something that I wrote earlier in this blog. I said we should not let the LLM write tests. However, as long as you supervise it closely, it is good to allow it to write tests. In the following work for the repositories, I will be using the LLM to write the tests first. However, I'll supervise it very closely, and likely will edit the tests myself.

I discussed the overall design, that there will be an infrastructure/neo4j package - which will have session_factory.py, unit_of_work.py, repository.py (base class/mixin for repositories) so the SentenceRepository will accept a session (rather than a driver).

I further discussed how it should mock the test, and then told it to write the test (so that I could have a look). It created: tests/dockb/repositories/test_sentence_repository.py
11 tests across 4 test classes:

Class Tests What it verifies
TestSaveNewSentence 5 Cypher has MATCH (p:Paragraph), MERGE (s:Sentence), UNWIND, MERGE (tok:Token), ON CREATE SET, ON MATCH SET, PART_OF; token fields serialized; order preserved via index; no orphan cleanup
TestSaveChangedSentence 2 Cypher includes OPTIONAL MATCH + DETACH DELETE orphan; only current tokens passed
TestSaveDeletedSentence 1 Cypher has DETACH DELETE with sentence_id param
TestSaveSkipStates 2 SYNC and _ states → session.run not called
TestSaveDirtySentence 1 dirty=TrueValueError raised, session.run not called
SentenceRepository.save() must provide:

class SentenceRepository:
    def __init__(self, session: Session) -> None: ...

    def save(self, sentence: Sentence, paragraph_id: str) -> None:
        """
        - Raises ValueError if sentence.dirty
        - Returns immediately if state is _ or SYNC
        - NEW: single MERGE+UNWIND Cypher with ON CREATE SET + ON MATCH SET
        - CHANGED: same as NEW plus orphan cleanup (OPTIONAL MATCH ... DETACH DELETE)
        - DELETED: DETACH DELETE the sentence node
        """

The extract_call helper in the test file expects session.run(cypher, **params_dict) — params as kwargs.

I checked the test, asked for one change, and then told it to create the repository class for the Sentence model. It created a very tidy SentenceRepository class, so I moved on to asking it to create the tests and then the repository classes for the other models.

I started to write a description into the README.md in the src/dockb/infrastructure/neo4j/ directory, and engaged in a fairly long dialog with the LLM about this, and it was very helpful, pointing out a few things before I got it right. Following is the description:

UnitOfWork combined with AsynchronousReconstructor and the dirty flag

Things get complicated because the reconstruction must be done before saving to the database, and:

  • reconstruction takes time
  • saving to database takes time

(This is done asynchronously on purpose because these time-consuming things should not block the editor, and editing events should return quickly back to the FE.)

Due to these asynchronous jobs not being super-quick, it's quite likely that another editing event will come in for the same model objects while in progress. This could make one of the models dirty again. If this happens during a reconstruct, that is OK - the reconstruct is interrupted and stopped, replaced with a new reconstruct job. However, if it happens while saving to the database, then it will throw.

If the UnitOfWork is constructed by the EventService when an edit event comes in, and committed when all the jobs related to the edit have completed, then the UnitOfWork can commit, and afterwards be freed up. If it gets interrupted by an exception, then we have a problem. The dirty model can't just be skipped because transactions often will contain many models that must be committed together. If one of the models is dirty, then we have to handle it carefully. Note that as the JobQueue is single threaded, then if the UnitOfWork commit was called from that thread, and no other mutating database accesses are ever done from any other threads (which is the case), then we can be certain that no other database modifications could be in progress.

Rather than create the UnitOfWork per EventService function, we have a UnitOfWorkFactory, which has a get_unit_of_work() function. That does not necessarily return a new one each time, but as needed. In fact it will always return the same UnitOfWork until that UnitOfWork's commit completes. UnitOfWorkFactory will also have a SyncReconstructor which it can provide to the UnitOfWork when it constructs them. The UnitOfWork will use the SyncReconstructor when its flush_pending() method is calld. UnitOfWorkFactory will also have a SessionFactory, DocCache, and nlp.

The EventService function will synchronously:

  1. it will construct a CommitJob.
  2. All the modified models will be added to the CommitJob as it works on them
  3. make modifications to the models (this will cause jobs to be added to the JobQueue, and models to be dirty)
  4. It will then add the CommitJob to the queue.
  5. Then it will return

What will happen asynchronously:

  1. models will have the semantics reconstructed
  2. each job will run.
  3. while this is happening, it is possible that one of the models will get zapped by another edit and become dirty
  4. finally the CommitJob will be run, it will:
    1. call the UnitOfWorkFactory to get_unit_of_work()
    2. add all the models to the UnitOfWork.
    3. The UnitOfWork will check if any of the models in it are dirty. If so, it will throw, which will be caught by the CommitJob which will return and let the JobQueue continue processing
    4. UnitOfWork will issue Cypher commands - if any throw, then return and let the JobQueue continue
    5. UnitOfWork will commit (and this will signal the UnitOfWorkFactory to construct a new UnitOfWork next time get_unit_of_work() is called.) The next time a CommitJob calls get_unit_of_work(), it will get a new one.

If the CommitJob had exited early (due to UnitOfWork throwing), then the UnitOfWork will retain the models and they will get committed next time - by the next CommitJob.

However, what if there is not another CommitJob? The JobQueue will sense that nothing is happening in it after a short period of time, maybe 0.5 second. When its idle timer triggers, it will call the on_idle() method on a listener, and the listener will check the UnitOfWorkFactory - calling its get_current_unit_of_work(). The get_current_unit_of_work() function will return None if the previous one was committed successfully. If it is not None, then that pending UnitOfWork will have its flush_pending() method called, which will synchronously reconstruct any dirty models, and then commit. The "listener" in this case is the IdleFlushListener, which will be created from a factory.

I told it to use this description, write the tests, and then stop and wait for me. It wrote a lot of tests, "Goodbye". I have some work to do... 55 tests created: