|
此文章由 DDD888 原创或转贴,不代表本站立场和观点,版权归 oursteps.com.au 和作者 DDD888 所有!转贴必须注明作者、出处和本声明,并保持内容完整
这是我过去写的项目里的一部分代码
fn main() -> Result<(), ApplicationError> {
let (path, config) = read_config()?;
info!("version 1.0.0.0 config file path {:?}", path);
let start_time = Instant::now();
let (sender_string, receiver_string) = unbounded();
let lines = BufReader::new(
File::open(config.code_file_name.clone())
.report_context(ApplicationError::FileOpen(config.code_file_name.clone()))?,
)
.lines()
.filter_map(|line| {
let trimmed = line.ok()?.trim().to_owned();
if !trimmed.is_empty() && !trimmed.starts_with('#') {
Some(trimmed)
} else {
None
}
})
.collect::<Vec<_>>();
for symbol in lines {
sender_string.send(symbol.to_owned()).unwrap();
}
let runtime = Builder::new_multi_thread()
.enable_all()
.thread_name("multi-thread")
.worker_threads(config.worker_threads)
.build()
.report_context(ApplicationError::TokioRuntime)?;
let _ = runtime.block_on(run_tasks(&config, &receiver_string));
info!(
"Time elapsed: {} milliseconds",
Instant::now().duration_since(start_time).as_millis()
);
Ok(())
}
pub async fn run_tasks(config: &ApplicationConfig, receiver_string: &Receiver<String>) {
let tasks: Vec<_> = (0..config.task_count)
.map(|_| tokio::spawn(my_async_task(config.clone(), receiver_string.clone())))
.collect();
for task in tasks {
let _ = task.await;
}
}
pub async fn my_async_task(config: ApplicationConfig, the_receiver: Receiver<String>) {
while let Ok(symbol) = the_receiver.try_recv() {
match handle_product(&config, &symbol).await {
Ok(_) => {}
Err(the_error) => error!("{}", the_error),
}
}
}
pub async fn handle_product(
config: &ApplicationConfig,
symbol: &str,
) -> Result<(), ApplicationError> {
let user_agent = config.user_agent.clone();
/*
if Path::new(&file_path).exists() {
contents = read_file(&file_path)?;
} else {
*/
let date_start = 1276948800;
let date_stop = get_today_unixtimestamp();
let the_url = config
.data_url
.replace("{date_start}", &format!("{}", date_start))
.replace("{date_stop}", &format!("{}", date_stop))
.replace("{symbol}", symbol);
info!("{}", the_url);
let contents = download_url_as_string(&user_agent, &the_url).await?;
//save_file(&file_path, &contents)?;
//}
let price_tick_vec = parse_json(&contents)?;
let mut price_tick_vec_result = PriceTickAveragesVec::new(&price_tick_vec);
price_tick_vec_result.calculations()?;
let mut builder = Builder::default();
for i in 0..price_tick_vec_result.array.len() {
builder.append(price_tick_vec_result.array.to_string()?);
}
let csv = builder
.string()
.report_context(ApplicationError::StringBuilder)?;
let data_file_path = format!("{}/{}.csv", config.download_files_folder, symbol);
save_file(&data_file_path, &csv)
}
pub async fn download_url_as_string(
user_agent: &str,
url: &str,
) -> Result<String, ApplicationError> {
let client = Client::new();
let request_builder = client
.get(url.clone())
.header("User-Agent", user_agent)
.timeout(Duration::from_secs(6));
let request = request_builder
.build()
.report_context(ApplicationError: ownloadUrl)?;
//info!("Request Headers: {:#?}", request.headers());
//info!("Request Body: {:#?}", request.body());
let response = client
.execute(request)
.await
.report_context(ApplicationError: ownloadUrl)?;
response
.text()
.await
.report_context(ApplicationError: ownloadUrl)
}
pub fn parse_json(contents: &str) -> Result<Vec< riceTick>, ApplicationError> {
// Parse the JSON data into a serde_json::Value object
let json: Value = serde_json::from_str(&contents).report_context(ApplicationError::Json)?;
let json_result = &json["chart"]["result"];
if let Some(value) = json_result["error"].as_str() {
error!("{value}");
bail!(ApplicationError::Json);
}
let (timestamp_array, adjclose_array) = parse_json_timestamp_value(json_result)?;
get_price_timestamp_value_array(×tamp_array, &adjclose_array)
}
|
评分
-
查看全部评分
|