Skip to content

eval_with_jobqueue

heartbeat_thread_func(worker_id, stop_event)

Background thread that sends heartbeats every 30 seconds.

Source code in OmniGibson/omnigibson/learning/eval_with_jobqueue.py
def heartbeat_thread_func(worker_id: str, stop_event: threading.Event) -> None:
    """Background thread that sends heartbeats every 30 seconds."""
    while not stop_event.is_set():
        send_heartbeat_all_jobs_and_resources(worker_id)
        # Wait for 10 seconds or until stop event is set
        stop_event.wait(30)

mark_job_as_completed(job_id, worker_id)

Mark a job as completed in the job queue.

Source code in OmniGibson/omnigibson/learning/eval_with_jobqueue.py
def mark_job_as_completed(job_id: str, worker_id: str) -> None:
    """Mark a job as completed in the job queue."""
    try:
        response = requests.post(f"{JOB_QUEUE_URL}/done/{job_id}", params={"worker": worker_id})
        response.raise_for_status()
        logger.info(f"Job {job_id} marked as completed")
    except Exception as e:
        logger.error(f"Failed to mark job as completed: {e}")

release_resource(resource_type, resource_idx, worker_id, job_id)

Release a reserved resource.

Source code in OmniGibson/omnigibson/learning/eval_with_jobqueue.py
def release_resource(resource_type: str, resource_idx: int, worker_id: str, job_id: str) -> None:
    """Release a reserved resource."""
    try:
        response = requests.post(
            f"{JOB_QUEUE_URL}/resource/{resource_type}/release",
            params={"worker": worker_id, "job_id": job_id, "resource_idx": resource_idx},
        )
        response.raise_for_status()
        logger.info(f"Resource {resource_type}[{resource_idx}] released successfully")
    except Exception as e:
        logger.warning(f"Failed to release resource: {e}")

request_job(worker_id)

Request a job from the job queue server.

Source code in OmniGibson/omnigibson/learning/eval_with_jobqueue.py
def request_job(worker_id: str) -> dict:
    """Request a job from the job queue server."""
    try:
        response = requests.get(f"{JOB_QUEUE_URL}/job", params={"worker": worker_id})
        response.raise_for_status()
        data = response.json()
        return data.get("job")  # Returns None if no job available
    except Exception as e:
        logger.error(f"Failed to request job: {e}")
        return None

reserve_resource(resource_type, worker_id, job_id)

Reserve a resource of the specified type for the job. Retries every 20 seconds until successful.

Source code in OmniGibson/omnigibson/learning/eval_with_jobqueue.py
def reserve_resource(resource_type: str, worker_id: str, job_id: str) -> dict:
    """Reserve a resource of the specified type for the job. Retries every 20 seconds until successful."""
    start_time = time.time()
    while True:
        try:
            response = requests.post(
                f"{JOB_QUEUE_URL}/resource/{resource_type}/acquire", params={"worker": worker_id, "job_id": job_id}
            )
            response.raise_for_status()
            logger.info(
                f"Resource {resource_type} reserved successfully after {(time.time() - start_time) / 60:.2f} minutes"
            )
            return response.json()  # Returns {"index": ..., "resource": ...}
        except Exception as e:
            logger.warning(
                f"Failed to reserve resource: {e}. Total wait time: {(time.time() - start_time) / 60:.2f} minutes. Retrying in 20 seconds..."
            )
            time.sleep(20)

send_heartbeat_all_jobs_and_resources(worker_id)

Send heartbeat for all jobs and resources assigned to this worker.

Source code in OmniGibson/omnigibson/learning/eval_with_jobqueue.py
def send_heartbeat_all_jobs_and_resources(worker_id: str) -> None:
    """Send heartbeat for all jobs and resources assigned to this worker."""
    try:
        response = requests.post(f"{JOB_QUEUE_URL}/heartbeat", params={"worker": worker_id})
        response.raise_for_status()
        logger.debug(f"Heartbeat sent successfully: {response.json()}")
    except Exception as e:
        logger.warning(f"Failed to send heartbeat: {e}")