savvato-software / tribe-app-backend

3 stars 20 forks source link

NOTES: Ping workflow #284

Open haxwell opened 1 month ago

haxwell commented 1 month ago

This is the result of a conversation with ChatGPT. It's not 100% how I'd want it, but I put it here now to think about at a later time.

Here’s how the full flow would work starting from the controller, based on your design and requirements:

1. User Presses Ping (Frontend):

2. PingController (Backend):

@RestController
@RequestMapping("/ping")
public class PingController {

    @Autowired
    private PingService pingService;

    @PostMapping
    public ResponseEntity<PingResponse> pingUser(@RequestParam Long userId) {
        PingResponse response = pingService.handlePing(userId);
        return new ResponseEntity<>(response, HttpStatus.OK);
    }
}

3. PingService:

@Service
public class PingService {

    @Autowired
    private DownstreamService downstreamService;

    @Autowired
    private PingResponseRepository pingResponseRepository;

    public PingResponse handlePing(Long userId) {
        // Step 1: Check for fresh results
        PingResponse cachedResponse = pingResponseRepository.findByUserId(userId);
        if (cachedResponse != null && isFresh(cachedResponse)) {
            return cachedResponse;
        }

        // Step 2: Ask ServiceB (DownstreamService) for downstream data
        String downstreamHash = downstreamService.getDownstreamData(userId);

        if (downstreamHash == null) {
            // Downstream data not ready, user goes back in the queue for processing
            queueUserForPingProcessing(userId);
            return null; // Return null for now, response will be ready later
        }

        // Step 3: Process announcements
        List<Announcement> announcements = processAnnouncementsForUser(userId, downstreamHash);

        // Step 4: Create and return the ping response
        PingResponse response = createPingResponse(userId, announcements);
        pingResponseRepository.save(response);

        return response;
    }

    private void queueUserForPingProcessing(Long userId) {
        // Logic to add the user to the processing queue
    }

    private boolean isFresh(PingResponse response) {
        // Logic to check if the cached response is fresh
    }
}

4. DownstreamService (ServiceB):

@Service
public class DownstreamService {

    @Autowired
    private DownstreamHashRepository downstreamHashRepository;

    @Autowired
    private DownstreamProcessingQueue downstreamProcessingQueue;

    public String getDownstreamData(Long userId) {
        // Check if downstream data exists
        DownstreamHash downstreamHash = downstreamHashRepository.findByUserId(userId);

        if (downstreamHash != null) {
            return downstreamHash.getHash();
        } else {
            // If no downstream data, add user to the downstream processing queue
            downstreamProcessingQueue.addUserToQueue(userId);
            return null;
        }
    }
}

5. ServiceB Scheduled Job:

@Scheduled(fixedRate = 60000) // Runs every minute
public void processDownstreamQueue() {
    List<Long> userIdsToProcess = downstreamProcessingQueue.getNextBatch();

    for (Long userId : userIdsToProcess) {
        Set<Long> downstreamUsers = calculateDownstreamUsers(userId);
        String downstreamHash = generateDownstreamHash(downstreamUsers);

        // Save downstream data
        saveDownstreamData(userId, downstreamHash, downstreamUsers);
    }
}

private Set<Long> calculateDownstreamUsers(Long userId) {
    // Logic to calculate downstream connections recursively
}

private void saveDownstreamData(Long userId, String downstreamHash, Set<Long> downstreamUsers) {
    // Store downstream_hash and related user IDs
    downstreamHashRepository.save(new DownstreamHash(userId, downstreamHash));

    // Save individual downstream connections
    for (Long downstreamUserId : downstreamUsers) {
        downstreamConnectionsRepository.save(new DownstreamConnection(downstreamHash, downstreamUserId));
    }
}

6. Processing Announcements:

public List<Announcement> processAnnouncementsForUser(Long userId, String downstreamHash) {
    List<Announcement> matchingAnnouncements = new ArrayList<>();

    List<Announcement> seekingAnnouncements = announcementRepository.findSeekingAnnouncements();

    for (Announcement announcement : seekingAnnouncements) {
        boolean isCached = announcementPingCacheRepository.existsByHashAndAnnouncementId(downstreamHash, announcement.getId());

        if (isCached) {
            // Use cached result
            if (announcementPingCacheRepository.isMatch(downstreamHash, announcement.getId())) {
                matchingAnnouncements.add(announcement);
            }
        } else {
            // Iterate over downstream users and check if they match
            List<Long> downstreamUsers = downstreamConnectionsRepository.findUsersByDownstreamHash(downstreamHash);
            if (isAnnouncementMatch(downstreamUsers, announcement)) {
                matchingAnnouncements.add(announcement);
                announcementPingCacheRepository.save(new AnnouncementPingCache(downstreamHash, announcement.getId(), true));
            } else {
                announcementPingCacheRepository.save(new AnnouncementPingCache(downstreamHash, announcement.getId(), false));
            }
        }
    }

    return matchingAnnouncements;
}

private boolean isAnnouncementMatch(List<Long> downstreamUsers, Announcement announcement) {
    for (Long downstreamUserId : downstreamUsers) {
        List<Long> downstreamTribes = userTribesRepository.findTribesByUserId(downstreamUserId);

        if (!Collections.disjoint(downstreamTribes, announcement.getTargetTribes())) {
            return true; // Match found
        }
    }
    return false; // No match found
}

Summary of the Flow:

  1. User triggers a ping on the frontend.
  2. PingController handles the request and forwards it to PingService.
  3. PingService checks for fresh results; if none exist, it calls DownstreamService (ServiceB).
  4. DownstreamService either provides existing downstream data or adds the user to the DPQ for processing.
  5. Scheduled Job (ServiceB) processes the queue, calculating downstream users and storing them in the database.
  6. PingService processes announcements, checking the cache for existing matches or iterating over downstream users to find relevant announcements.
  7. The results are cached and returned to the user.