I'm working with spring JPA
and HTTP post
request, fetching the data row by row then post the data into HTTP request to API and its worked fine with me, but here im working with bulk number of data, so i have to use multi-threading but im new with java and spring how do I implement to work with 10 thread and each one of them reads 1k per each time in parallel that here ?
i have read something about multithreading for 10 threads each thread of them read 1k row per each time, I have around 10 million records in my database
AccessingDataJpaApplication class :
@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
@Autowired
private Bulk_repositoryRepository bulk_repositoryRepository;
public static void main(String[] args) {
SpringApplication.run(AccessingDataJpaApplication.class);
}
Date currentDate = new Date();
@Override
public void run(String... args) throws Exception {
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
headers.setBasicAuth("user", "pass");
while(true) {
Date currentDate = new Date();
logger.info("Just Started");
for (Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate)) {
System.out.print(churnss);
logger.info(churnss.toString());
AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());
logger.info(AddOffer.toString());
HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);
ResponseEntity<String> responseEntity = restTemplate.exchange(
"api link", HttpMethod.POST, entity, String.class);
if(responseEntity.getStatusCode() == HttpStatus.OK){
String response = responseEntity.getBody();
churnss.setStatus(1);
churnss.setProcessDate(new Date());
churnss.setFulfilment_status(response);
logger.info(churnss.toString() + ", Response: " + response);
bulk_repositoryRepository.save(churnss);
}else {
logger.warn("Record Id: " + churnss.getId() + ", Http Failed Response: " + responseEntity.getStatusCode());
}
}
Thread.sleep(1000);
}
}
}
Bulk_repository class:
@Entity
@Table(name = "BULK_REPOSITORY")
public class Bulk_repository {
@Id
@GeneratedValue(strategy=GenerationType.AUTO)
@Column(name = "id")
private long id;
@Column(name = "msisdn")
private String msisdn;
@Column(name = "camp_start_date")
private Date campStartDate;
@Column(name = "camp_end_date")
private Date campEndDate;
@Column(name = "camp_type")
private int campType;
@Column(name = "camp_cd")
private String camp_cd;
@Column(name = "status")
private int status;
@Column(name = "process_date")
private Date processDate;
@Column(name = "entry_date")
private Date entryDate;
@Column(name = "entry_user")
private String entry_user;
@Column(name = "param1")
private String param1;
@Column(name = "param2")
private String param2;
@Column(name = "param3")
private String param3;
@Column(name = "param4")
private String param4;
@Column(name = "param5")
private String param5;
@Column(name = "error_desc")
private String error_desc;
@Column(name = "fulfilment_status")
private int fulfilment_status;
##then getter and setters and tostring
Bulk_repositoryRepository class :
public interface Bulk_repositoryRepository extends CrudRepository<Bulk_repository, Long> {
Date today = new Date();
List<Bulk_repository>findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(int status, int campType,Date today0, Date today1);
Bulk_repository findById(long id);
}
AddOfferRequest class :
public class AddOfferRequest {
private String ChannelID="113";
private String MSISDN;
private String ServiceID;
public AddOfferRequest() {
}
public AddOfferRequest(String channelID,String mSISDN,String serviceID ) {
this.MSISDN = mSISDN;
this.ServiceID = serviceID;
}
## then getter and setter and tostring
i have created AsyncConfiguration class:
package com.example.accessingdatajpa;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);
@Bean (name = "taskExecutor")
public Executor taskExecutor() {
LOGGER.debug("Creating Async Task Executor");
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("CarThread-");
executor.initialize();
return executor;
}
}
but till now i can't undertand how can combaine the findby and http post with multithreading
See Question&Answers more detail:
os