Uploaded image for project: 'Non Realtime RAN Intelligent Controller '
  1. Non Realtime RAN Intelligent Controller
  2. NONRTRIC-996

DMaaPAdapter stops without retries if there is an issue sending to consumer

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Medium Medium
    • None

      Issue:

      When there is an exception while sending message to consumer, then we get the error “:JobDataDistributor stopped jobId:{}”.    [JobDataDistributor.start method] and further messages are not sent to the consumer.

      There is no retry being done here, JobDataDistributor  stops and doesn’t handle any further messages.

       

      Probable fix:

      We’ve added the fix to introduce retries with delay. When Dmapp-Adapter  is not able to send the message to consumer it will retry 3 times with a delay of 5 seconds.

      After that if still it is not able to send it will reach this.onComplete(), it stops the JobDataDistributor .

      Please find relevant code changes highlighted in red below.Also attached modified file to identify small changes. Kindly let us know if you are fine with fix and if we should propagate the fix accordingly.

      What is the process to propagate the fix.

       

      Code changes :

      public void start(Flux<TopicListener.DataFromTopic> input) {

              logger.debug("Starting distribution, to topic: {}", jobGroup.getId());

              PmReportFilter filter = getPmReportFilter(this.jobGroup);

              final reactor.util.retry.RetrySpec retry =

                              reactor.util.retry.Retry.max(3).doBeforeRetry(output -> introduceDelay());

              if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {

                  this.subscription = filterAndBuffer(input, this.jobGroup) //

                          .doOnNext(filtered -> logger.debug("received data"))

                          .flatMap(filtered -> this.sendToClient(filtered).retryWhen(retry)) //                                                    //

                          .onErrorResume(this::handleError) //

                          .subscribe(this::handleSentOk, //

                                  this::handleExceptionInStream, //

                                  () -> logger.warn("JobDataDistributor stopped jobId: {}", jobGroup.getId()));

              }

       

              if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {

                  this.dataStore.createLock(collectHistoricalDataLockName()) //

                          .doOnNext(isLockGranted -> {

                              if (isLockGranted.booleanValue()) {

                                  logger.debug("Checking historical PM ROP files, jobId: {}", this.jobGroup.getId());

                              } else {

                                  logger.debug("Skipping check of historical PM ROP files, already done. jobId: {}",

                                          this.jobGroup.getId());

                              }

                          }) //

                          .filter(isLockGranted -> isLockGranted) //

                          .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames())) //

                          .doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,

                                  this.jobGroup.getId())) //

                          .flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //

                          .filter(this::isRopFile) //

                          .filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //

                          .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //

                          .map(this::createFakeEvent) //

                          .flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.jobGroup.getType(),

                                  dataStore), 100)

                          .map(jobGroup::filter) //

                          .map(this::gzip) //

                          .flatMap(filtered -> this.sendToClient(filtered).retryWhen(retry), 1) //

                          .onErrorResume(this::handleCollectHistoricalDataError) //

                          .doFinally(sig -> sendLastStoredRecord()) //

                          .subscribe();

              }

          }

       

      The lines highlighted in green we have a doubt.

      1. What is the purpose of sendLastStoredRecord(). Do we need retry logic here also?
      2. And why subscribe() is not having any methods like handleSentOk, handleExceptionInStream and onCOmplete.how will we know whether a job is stopped in the logs.

       

      Thanks

      Guru

       

        No reviews matched the request. Check your Options in the drop-down menu of this sections header.

            aravind.est Aravindhan Ayyanathan
            JohnKeeney John Keeney
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: